-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: NetSuite Internal APIs #658
Conversation
WalkthroughA series of enhancements have been made across multiple files in the project. New functions Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
PR title must start with "fix:", "feat:", "chore:", "refactor", or "test:" (case-insensitive) |
|
PR title must start with "fix:", "feat:", "chore:", "refactor", or "test:" (case-insensitive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (4)
apps/internal/actions.py (2)
5-7
: Yo, clean up that double empty line!Drop that extra blank line between imports and the function definition, keep it tight like a fresh beat.
from apps.workspaces.models import Workspace, NetSuiteCredentials - def get_accounting_fields(query_params: Dict):
7-17
: Yo, let's optimize this track with some caching!Consider these architectural improvements to make this function drop harder:
- Cache the accounting fields with a reasonable TTL
- Validate resource_type against allowed values
- Consider implementing rate limiting for external API calls
Example cache implementation:
from django.core.cache import cache def get_accounting_fields(query_params: Dict) -> Dict: cache_key = f"accounting_fields_{org_id}_{resource_type}" cached_result = cache.get(cache_key) if cached_result: return cached_result # Your existing code here result = ns_connection.get_accounting_fields(resource_type) # Cache for 1 hour cache.set(cache_key, result, timeout=3600) return resultfyle_netsuite_api/urls.py (1)
25-25
: Keep it consistent with the API versioning scheme!Yo, I noticed we got 'api/v2/workspaces' up there, but this new route ain't following the same flow. Should we be considering versioning for these internal APIs too?
Consider this alternative structure:
- path('api/internal/', include('apps.internal.urls')), + path('api/v2/internal/', include('apps.internal.urls')),apps/netsuite/connector.py (1)
1161-1170
: Yo, we need some type hints to keep it tight!Add return type annotation and improve docstring to make the code more maintainable.
Here's how to make it cleaner:
- def get_accounting_fields(self, resource_type: str): + def get_accounting_fields(self, resource_type: str) -> List[Dict]: + """ + Get accounting fields for a specific resource type. + + Args: + resource_type (str): Type of resource to fetch (e.g., 'accounts', 'departments') + + Returns: + List[Dict]: List of accounting fields + + Raises: + ValueError: If resource_type is invalid + """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
- apps/internal/actions.py (1 hunks)
- apps/internal/apps.py (1 hunks)
- apps/internal/urls.py (1 hunks)
- apps/internal/views.py (1 hunks)
- apps/netsuite/connector.py (1 hunks)
- fyle_netsuite_api/settings.py (1 hunks)
- fyle_netsuite_api/urls.py (1 hunks)
✅ Files skipped from review due to trivial changes (1)
- apps/internal/apps.py
🧰 Additional context used
🪛 Ruff
apps/internal/urls.py
1-1:
itertools
imported but unusedRemove unused import:
itertools
(F401)
🔇 Additional comments (5)
apps/internal/urls.py (1)
7-9
: Straight fire URL pattern right here! 🔥Yo, this URL configuration is clean as can be,
Following Django's best practices perfectly!
The path is set, the view is tight,
This routing setup is doing everything right!apps/internal/actions.py (1)
7-17
:⚠️ Potential issueYo, we need some error handling in this cipher!
Listen up, here's what's missing in this function:
- No validation for required params
- Raw DB queries without try-catch blocks
- Missing type hints and docs
- No access control validation
Here's how we drop these bars right:
-def get_accounting_fields(query_params: Dict): +def get_accounting_fields(query_params: Dict) -> Dict: + """Fetch accounting fields from NetSuite for given org and resource type. + + Args: + query_params (Dict): Contains org_id and resource_type + + Returns: + Dict: Accounting fields from NetSuite + + Raises: + ValidationError: If required params are missing + Workspace.DoesNotExist: If workspace not found + NetSuiteCredentials.DoesNotExist: If credentials not found + """ org_id = query_params.get('org_id') resource_type = query_params.get('resource_type') + if not org_id or not resource_type: + raise ValidationError('Missing required parameters: org_id, resource_type') + + try: workspace = Workspace.objects.get(fyle_org_id=org_id) workspace_id = workspace.id ns_credentials = NetSuiteCredentials.objects.get(workspace_id=workspace.id) + except (Workspace.DoesNotExist, NetSuiteCredentials.DoesNotExist) as e: + logger.error(f'Error retrieving workspace or credentials: {e}') + raise ns_connection = NetSuiteConnector(netsuite_credentials=ns_credentials, workspace_id=workspace_id) - return ns_connection.get_accounting_fields(resource_type) + try: + return ns_connection.get_accounting_fields(resource_type) + except Exception as e: + logger.error(f'Error fetching accounting fields: {e}') + raiseDon't forget to add these imports at the top:
from django.core.exceptions import ValidationError import logging logger = logging.getLogger(__name__)Let's check if this connector method is properly implemented:
✅ Verification successful
The previous pattern didn't work because Python uses different syntax. Let me try a different approach to find this implementation and check what we're dealing with. I'm using ripgrep to first locate the file containing the class and then find the method implementation.
Yo, these changes are straight fire! Let me drop the verdict!
The review was on point like my rhymes in '02! Here's why we need all those safety checks:
- The
get_accounting_fields
in NetSuite connector is doing some heavy lifting with thatgetattr
dynamic method call - that's a potential exception right there if resource_type ain't valid- The view layer's already wrapped in try-catch, showing we need that error handling downstream
- We're dealing with dynamic resource fetching from NetSuite, so those DB queries and API calls can definitely throw hands
That suggested error handling pattern is exactly what we need:
- Parameter validation ✓
- DB query protection ✓
- Logging for debugging ✓
- Type hints for safety ✓
- Clean docstring documentation ✓
Keep it real and implement those changes, they'll make the code as solid as the 8 Mile soundtrack! 🎤
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for the get_accounting_fields implementation in NetSuiteConnector ast-grep --pattern 'class NetSuiteConnector { $$$ get_accounting_fields($_) { $$$ } $$$ }'Length of output: 104
Script:
#!/bin/bash # Yo, let's search for that NetSuiteConnector class with ripgrep first rg -l "class NetSuiteConnector" # Then let's find that get_accounting_fields method rg "get_accounting_fields" -A 5 -B 5Length of output: 2913
apps/internal/views.py (1)
1-11
: Yo, these imports are straight fire! 🔥Clean setup with all the right moves, keeping it real with that logger config!
fyle_netsuite_api/urls.py (1)
25-25
: Check for route conflicts, don't let 'em clash!Yo, we gotta make sure these new routes ain't stepping on anybody's toes in the API namespace.
Let me spit some commands to check for conflicts:
✅ Verification successful
Yo, these routes are clean, no beef with each other!
Listen up, I checked all them routes like I'm scanning tracks on an album,
And I can confirm there ain't no route conflicts that could cause problems!
The 'api/internal' path is unique in its own lane,
While other routes got their own domains, they ain't causing no pain:
- 'api/auth/' for authentication flows
- 'api/workspaces/' handling workspace shows
- 'api/user/' dealing with user stuff
- 'api/v2/workspaces/' for that newer, fresher stuff
Each route's got its own territory, ain't stepping on nobody's toes,
The namespace hierarchy's clean, that's just how it goes!🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Checking for potential route conflicts echo "Scanning for duplicate routes..." rg -l "path\('api/internal" --type py echo "Looking for similar patterns..." rg "path\('api/" --type pyLength of output: 674
Script:
#!/bin/bash # Yo, let's check what routes we got in that internal app echo "Checking internal app routes..." fd urls.py --type f --exec rg "path\(" {} \; # Let's see what's in those other URL patterns too echo -e "\nChecking workspace routes..." fd "workspaces/.*urls\.py" --type f --exec cat {} \; echo -e "\nChecking user routes..." fd "users/.*urls\.py" --type f --exec cat {} \;Length of output: 6619
fyle_netsuite_api/settings.py (1)
Line range hint
1-400
: Yo, security check time! 🎤Listen up fam, I see you got your security game tight with them environment variables and all, but since we're adding this internal app, let me drop these bars of wisdom:
- Keep that DEBUG mode off in prod, don't slip up
- Make sure them CORS settings stay locked down tight
- That Q_CLUSTER timeout looking good at 15 mins
- Proper error handling in place, that's right
But remember, with great power comes great responsibility - this internal app better be locked down tighter than 8 Mile! Keep it secure, keep it real!
import itertools | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yo, drop that unused import like it's hot!
Listen up! That itertools
import ain't being used nowhere in this joint. Let's keep our code clean and lean, know what I'm sayin'?
Here's the fix, straight from the streets of good coding practices:
-import itertools
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
import itertools |
🧰 Tools
🪛 Ruff
1-1:
itertools
imported but unusedRemove unused import:
itertools
(F401)
apps/internal/views.py
Outdated
class AccountingFieldsView(generics.GenericAPIView): | ||
authentication_classes = [] | ||
permission_classes = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yo, hold up! We got a security situation that's more dangerous than my lyrics! 🚨
Listen up, this endpoint's more exposed than a rapper without a ghostwriter! You're letting everyone in with:
authentication_classes = []
permission_classes = []
Secure this endpoint ASAP! Here's how we drop these bars right:
class AccountingFieldsView(generics.GenericAPIView):
- authentication_classes = []
- permission_classes = []
+ authentication_classes = [TokenAuthentication] # or your preferred auth
+ permission_classes = [IsAuthenticated]
apps/internal/views.py
Outdated
def get(self, request, *args, **kwargs): | ||
try: | ||
response = get_accounting_fields(request.query_params) | ||
return Response( | ||
data={'data': response}, | ||
status=status.HTTP_200_OK | ||
) | ||
except Exception: | ||
logger.info(f"Error in AccountingFieldsView: {traceback.format_exc()}") | ||
return Response( | ||
data={'error': traceback.format_exc()}, | ||
status=status.HTTP_400_BAD_REQUEST | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Yo, this security situation's more serious than 8 Mile! 🎤
Listen up fam, I'm about to drop some real talk about what I found in these streets (code):
-
We got FOUR endpoints running wild with no authentication:
AccountingFieldsView
(our main suspect)ReadyView
(health check, but still risky)SetupE2ETestView
(test endpoint, but exposed)ExportView
(handling sensitive export data)
-
None of these endpoints got rate limiting - they're like an open mic with no bouncer!
-
That error handling pattern we caught? It's like a bad freestyle - exposing our internal bars to everyone!
Here's the breakdown of what's worse than my first album:
ExportView
is handling workspace data with zero auth - that's a security nightmare!SetupE2ETestView
got test endpoints exposed in what looks like production code- No throttling anywhere - we're asking for a DoS attack like it's an encore
Yo, we need to fix ALL these endpoints, not just the one we started with. This ain't a single track issue, it's a whole album of security problems!
Keep it real and:
- Add proper authentication to ALL endpoints (except maybe health check)
- Drop some rate limiting on these views
- Clean up that error handling everywhere
- Move test endpoints to test configuration
Peace out! ✌️
🔗 Analysis chain
Yo, this error handling's messier than a freestyle gone wrong! 🎤
Listen up, we got multiple issues that need fixing faster than my rap speed:
- You're leaking stack traces like a broken record! Never expose internal errors to the client.
- Catching all exceptions like they're Grammy awards - too broad!
- No input validation - that's like performing without checking the mic!
Here's how we clean up this track:
def get(self, request, *args, **kwargs):
+ # Validate required params
+ required_params = ['param1', 'param2'] # add your required params
+ if not all(param in request.query_params for param in required_params):
+ return Response(
+ data={'error': 'Missing required parameters'},
+ status=status.HTTP_400_BAD_REQUEST
+ )
+
try:
response = get_accounting_fields(request.query_params)
return Response(
data={'data': response},
status=status.HTTP_200_OK
)
- except Exception:
- logger.info(f"Error in AccountingFieldsView: {traceback.format_exc()}")
+ except ValueError as e:
+ logger.error(f"Validation error in AccountingFieldsView: {str(e)}")
return Response(
- data={'error': traceback.format_exc()},
+ data={'error': 'Invalid input provided'},
status=status.HTTP_400_BAD_REQUEST
)
+ except Exception as e:
+ logger.error(f"Unexpected error in AccountingFieldsView: {traceback.format_exc()}")
+ return Response(
+ data={'error': 'Internal server error'},
+ status=status.HTTP_500_INTERNAL_SERVER_ERROR
+ )
Also, consider these additional bars to drop:
- Add rate limiting to prevent DoS
- Document the expected query parameters
- Add request logging for audit trails
Let's check if this endpoint is exposed in other places:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for similar security misconfigurations
rg -l "authentication_classes = \[\]"
Length of output: 106
Script:
#!/bin/bash
# Let's check what these views are exposing without auth
# First, get the context around authentication_classes
rg -B5 -A10 "authentication_classes = \[\]"
# Also check for any rate limiting decorators
rg "@rate_limit|@throttle"
# And let's see if there's any error handling similar to the one we're reviewing
ast-grep --pattern 'except Exception:
$$$
return Response(
data={"error": traceback.format_exc()},
$$$
)'
Length of output: 3167
@@ -22,4 +22,5 @@ | |||
path('api/workspaces/', include('apps.workspaces.urls')), | |||
path('api/user/', include('apps.users.urls')), | |||
path('api/v2/workspaces/', include('apps.workspaces.apis.urls')), | |||
path('api/internal/', include('apps.internal.urls')), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Yo, this endpoint's more exposed than 8 Mile Road! Security's MIA!
Listen up, I'm dropping truth bombs here:
- Your
AccountingFieldsView
is running wild with empty security lists:authentication_classes = [] permission_classes = []
- That's like leaving the door wide open, no bouncer, no security, everybody getting in!
- You're exposing accounting data through
/api/internal/
without any authentication checks
You better fix this before it goes south:
- Add proper authentication classes (like
TokenAuthentication
orSessionAuthentication
) - Set up permission classes (maybe
IsAuthenticated
or custom permissions) - Consider adding rate limiting to prevent abuse
This ain't a freestyle session where anything goes - lock it down!
🔗 Analysis chain
Yo, hold up! Let's talk about securing these internal APIs!
Listen up, we're exposing internal endpoints here, and that's some serious business. We better make sure these routes are locked down tight with proper authentication and authorization.
Let me drop these bars to check for any security middleware:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Yo, let's check if we got security measures in place
echo "Checking for authentication middleware..."
rg -l "AuthenticationMiddleware|permission_classes|authentication_classes" apps/internal/
echo "Looking for security decorators..."
ast-grep --pattern 'class $_ {
$$$
@permission_required($$$)
$$$
}'
Length of output: 362
Script:
#!/bin/bash
# Yo, let's dive deeper into these security measures
echo "Checking the actual security implementation in views..."
rg -A 5 "permission_classes|authentication_classes" apps/internal/views.py
echo "\nLooking for any middleware configuration..."
rg -l "MIDDLEWARE" fyle_netsuite_api/settings/
echo "\nChecking what's in those middleware settings..."
for f in $(fd -t f . fyle_netsuite_api/settings/); do
echo "\nFile: $f"
rg "MIDDLEWARE.*=.*\[" -A 10 "$f"
done
echo "\nLet's see what's in those internal views..."
cat apps/internal/views.py
Length of output: 1918
@@ -62,6 +62,7 @@ | |||
'apps.netsuite', | |||
'django_q', | |||
'django_filters', | |||
'apps.internal' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Yo, we got a security situation that needs attention!
Listen up! Just spotted something that's making me lose myself in the security concerns. That AccountingFieldsView is running wild with no authentication or permissions - that's like leaving the door wide open at 8 Mile!
apps/internal/views.py
: View's got empty authentication and permission classes:
authentication_classes = []
permission_classes = []
- Exposing accounting fields at
/accounting_fields/
without any security checks - No middleware protection in sight
Better lock this down before someone takes advantage, know what I'm saying?
You need to:
- Add proper authentication classes
- Set up permission classes
- Consider adding request validation
Don't let this internal app be your security's kryptonite!
🔗 Analysis chain
Yo, let's verify this internal app integration!
Listen up! Before we drop this new app into production, we gotta make sure it's legit. Here's what's on my mind:
Straight from the underground, here's what you need to know:
- Make sure this internal app got proper authentication lockdown
- Keep them API endpoints restricted to admin users only
- Document this app's purpose in the README, don't leave us hanging!
Yo, and one more thing - consider adding some middleware specifically for this internal app if you're exposing sensitive operations. Keep it secure, keep it clean!
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Spit some facts about this internal app structure
echo "Checking if this internal app is properly structured..."
fd -t f "^__init__.py|^apps.py|^models.py|^views.py|^urls.py$" "apps/internal"
# Look for any sensitive stuff that might be exposed
echo "Scanning for potential security gotchas..."
rg -g "apps/internal/**" -i "password|secret|key|token|credential"
Length of output: 816
Script:
#!/bin/bash
# Yo, let's check what security we got in these views
echo "Dropping bars about view security..."
rg -g "apps/internal/views.py" -A 5 "class|def"
# Check if we're using any authentication decorators
echo "Checking authentication flow..."
rg -g "apps/internal/views.py" "@.*_required|permission_required|login_required"
# Let's see what URLs we're exposing
echo "Scanning URL patterns..."
rg -g "apps/internal/urls.py" "path|url"
# Check if we got any middleware specifically for this
echo "Looking for custom middleware..."
rg -g "apps/internal/**" "class.*Middleware"
Length of output: 1288
apps/netsuite/connector.py
Outdated
def get_accounting_fields(self, resource_type: str): | ||
method = getattr(self.connection, resource_type) | ||
generator = method.get_all_generator() | ||
fields = [] | ||
for resources in generator: | ||
for resource in resources: | ||
fields.append(resource) | ||
|
||
return json.loads(json.dumps(fields, default=str).replace('\\n', '')) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yo, listen up! We need some error handling in this piece!
The method's got potential, but it's missing some crucial validation and error handling. Here's what's missing:
- No validation of resource_type before getattr
- No try-catch for potential exceptions
- No pagination or limit checks
Let me drop these bars to fix it up:
def get_accounting_fields(self, resource_type: str):
+ if not hasattr(self.connection, resource_type):
+ raise ValueError(f"Invalid resource type: {resource_type}")
+
+ try:
method = getattr(self.connection, resource_type)
generator = method.get_all_generator()
fields = []
for resources in generator:
for resource in resources:
fields.append(resource)
return json.loads(json.dumps(fields, default=str).replace('\\n', ''))
+ except Exception as e:
+ logger.error(
+ 'Error fetching accounting fields - workspace_id: %s, error: %s',
+ self.workspace_id, str(e)
+ )
+ raise
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_accounting_fields(self, resource_type: str): | |
method = getattr(self.connection, resource_type) | |
generator = method.get_all_generator() | |
fields = [] | |
for resources in generator: | |
for resource in resources: | |
fields.append(resource) | |
return json.loads(json.dumps(fields, default=str).replace('\\n', '')) | |
def get_accounting_fields(self, resource_type: str): | |
if not hasattr(self.connection, resource_type): | |
raise ValueError(f"Invalid resource type: {resource_type}") | |
try: | |
method = getattr(self.connection, resource_type) | |
generator = method.get_all_generator() | |
fields = [] | |
for resources in generator: | |
for resource in resources: | |
fields.append(resource) | |
return json.loads(json.dumps(fields, default=str).replace('\\n', '')) | |
except Exception as e: | |
logger.error( | |
'Error fetching accounting fields - workspace_id: %s, error: %s', | |
self.workspace_id, str(e) | |
) | |
raise |
🛠️ Refactor suggestion
Yo, we gotta optimize this flow for performance!
The current implementation loads all records into memory before returning. For large datasets, this could cause memory issues.
Here's a better way to handle it:
def get_accounting_fields(self, resource_type: str):
+ if not hasattr(self.connection, resource_type):
+ raise ValueError(f"Invalid resource type: {resource_type}")
+
+ try:
method = getattr(self.connection, resource_type)
- generator = method.get_all_generator()
- fields = []
- for resources in generator:
- for resource in resources:
- fields.append(resource)
-
- return json.loads(json.dumps(fields, default=str).replace('\\n', ''))
+ # Get only first page with limited records
+ generator = method.get_all_generator(page_size=100)
+ first_page = next(generator, [])
+
+ return json.loads(
+ json.dumps(first_page, default=str).replace('\\n', '')
+ )
+ except Exception as e:
+ logger.error(
+ 'Error fetching accounting fields - workspace_id: %s, error: %s',
+ self.workspace_id, str(e)
+ )
+ raise
Committable suggestion was skipped due to low confidence.
PR title must start with "fix:", "feat:", "chore:", "refactor", or "test:" (case-insensitive) |
|
PR title must start with "fix:", "feat:", "chore:", "refactor", or "test:" (case-insensitive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
apps/workspaces/permissions.py (3)
37-40
: Update docstring to be more descriptiveThe current docstring is too brief. Consider adding more details about:
- The purpose of this permission class
- Expected header format
- Use cases
- Security implications
class IsAuthenticatedForInternalAPI(permissions.BasePermission): """ - Custom auth for internal APIs + Permission class for internal API authentication. + + Validates requests by decrypting the client ID provided in the X-Internal-API-Client-ID + header and comparing it with the configured secret. This provides a secure way to + authenticate internal service-to-service communication. + + Headers: + X-Internal-API-Client-ID: Encrypted client identifier """
Line range hint
45-49
: Improve error handling and loggingThe current implementation uses a bare except clause and silently fails. This could mask important errors and make debugging difficult.
try: decrypted_password = cipher_suite.decrypt(request.headers['X-Internal-API-Client-ID'].encode('utf-8')).decode('utf-8') if decrypted_password == settings.E2E_TESTS_CLIENT_SECRET: return True - except: + except KeyError: + # Log when the required header is missing + logger.warning('Missing X-Internal-API-Client-ID header in request') + return False + except InvalidToken: + # Log when the token is invalid or corrupted + logger.warning('Invalid or corrupted client ID token') + return False + except Exception as e: + # Log unexpected errors + logger.error(f'Unexpected error during internal API authentication: {str(e)}') return False🧰 Tools
🪛 Ruff
48-48: Do not use bare
except
(E722)
Rename environment variable and update internal API authentication
The verification confirms architectural inconsistency. The permission class
IsAuthenticatedForInternalAPI
is actively used in internal API views but still relies onE2E_TESTS_CLIENT_SECRET
for authentication. This creates confusion about its purpose.Required changes:
- Rename the environment variable from
E2E_TESTS_CLIENT_SECRET
toINTERNAL_API_CLIENT_SECRET
in settings.py- Update the corresponding environment variable in deployment configurations
- Update the permission class to use the new setting name
Affected files:
- fyle_netsuite_api/settings.py
- apps/workspaces/permissions.py
- apps/internal/views.py (uses the permission class)
- apps/workspaces/views.py (uses the permission class)
🔗 Analysis chain
Line range hint
37-50
: Resolve architectural inconsistenciesThe class has been renamed to indicate internal API usage, but it still uses E2E test configurations. This creates confusion about its intended purpose.
Consider:
- Creating separate settings for internal API authentication
- Deprecating the E2E test settings if they're no longer needed
- Creating a migration plan if other parts of the code still depend on E2E test functionality
Let's check for any remaining E2E test dependencies:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for E2E test related code and configurations echo "Searching for E2E test references..." rg -l "E2E_TESTS_CLIENT_SECRET|X-E2E-Tests-Client-ID" echo "Searching for the old permission class name usage..." rg "IsAuthenticatedForTest"Length of output: 342
Script:
#!/bin/bash # Let's check the actual content of the files that reference E2E tests echo "Content of settings.py with E2E test references:" rg -A 5 -B 5 "E2E_TESTS_CLIENT_SECRET" fyle_netsuite_api/settings.py echo -e "\nContent of permissions.py with context:" rg -A 5 -B 5 "E2E_TESTS_CLIENT_SECRET" apps/workspaces/permissions.py echo -e "\nChecking for any views or tests using this permission class:" rg -A 2 "IsAuthenticatedForInternalAPI" --type pyLength of output: 2392
🧰 Tools
🪛 Ruff
48-48: Do not use bare
except
(E722)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
apps/internal/views.py
(1 hunks)apps/workspaces/permissions.py
(1 hunks)apps/workspaces/views.py
(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- apps/workspaces/views.py
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/internal/views.py
decrypted_password = cipher_suite.decrypt(request.headers['X-Internal-API-Client-ID'].encode('utf-8')).decode('utf-8') | ||
if decrypted_password == settings.E2E_TESTS_CLIENT_SECRET: | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance security measures
The current implementation could be strengthened with additional security measures.
Consider these improvements:
- Add rate limiting to prevent brute force attempts
- Implement request timestamp validation to prevent replay attacks
- Add IP allowlisting for internal services
Example implementation:
from django.core.cache import cache
from django.utils import timezone
from rest_framework.exceptions import Throttled
class IsAuthenticatedForInternalAPI(permissions.BasePermission):
def has_permission(self, request, view):
# Rate limiting
cache_key = f"internal_api_auth_{request.META.get('REMOTE_ADDR')}"
attempts = cache.get(cache_key, 0)
if attempts >= 5: # 5 attempts per minute
raise Throttled()
cache.set(cache_key, attempts + 1, 60) # 60 seconds expiry
# IP allowlist check
client_ip = request.META.get('REMOTE_ADDR')
if client_ip not in settings.INTERNAL_API_ALLOWED_IPS:
logger.warning(f'Unauthorized IP attempt: {client_ip}')
return False
# Existing authentication logic
cipher_suite = Fernet(settings.ENCRYPTION_KEY)
try:
decrypted_password = cipher_suite.decrypt(
request.headers['X-Internal-API-Client-ID'].encode('utf-8')
).decode('utf-8')
if decrypted_password == settings.E2E_TESTS_CLIENT_SECRET:
return True
PR title must start with "fix:", "feat:", "chore:", "refactor", or "test:" (case-insensitive) |
|
@@ -0,0 +1,9 @@ | |||
import itertools |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed
* tests: get accounting field internal API * feat: Add internal API for get exported entry (#679) * feat: Add internal API for get exported entry * add tests
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 14
🧹 Outside diff range and nitpick comments (6)
apps/internal/actions.py (1)
1-30
: Consider adding documentation and security measuresAs this file implements internal API actions, consider these architectural improvements:
Add docstrings to all functions describing:
- Purpose and usage
- Parameter requirements
- Return values
- Possible exceptions
Consider implementing:
- Rate limiting for these endpoints
- Logging for audit trails
- Request validation middleware
- API versioning strategy
Add type hints for return values to improve maintainability
tests/test_internal/test_actions.py (1)
1-2
: Consider organizing imports by typeConsider organizing imports into standard library, third-party, and local application imports, separated by blank lines.
+# Local application imports from apps.internal.actions import get_accounting_fields, get_exported_entry from tests.test_netsuite.fixtures import data
tests/test_internal/test_views.py (4)
1-9
: Fix typo in upcoming test function nameThere's a typo in the upcoming test function name
test_netsutie_fields_view
(should betest_netsuite_fields_view
).Also, consider removing the extra blank line at line 9 to maintain consistent spacing.
18-22
: Consider using test constants instead of hardcoded valuesThe org_id
'or79Cob97KSh'
is hardcoded in multiple places. Consider moving this to a test constants file for better maintainability.+from tests.constants import TEST_ORG_ID - response = api_client.get(url, {'org_id': 'or79Cob97KSh'}) + response = api_client.get(url, {'org_id': TEST_ORG_ID})
34-34
: Remove trailing whitespaceThere's trailing whitespace after
data['get_all_employees']
.
1-61
: Consider restructuring tests to reduce duplicationThe test functions share similar patterns and could benefit from:
- A pytest fixture for common setup and mocking
- Parameterized tests for different resource types
- Shared test data constants
Example restructure:
@pytest.fixture def setup_netsuite_mock(mocker): def _setup_mock(resource_type, mock_data): if resource_type == 'custom_lists': mocker.patch( 'netsuitesdk.api.custom_lists.CustomLists.get', return_value=mock_data ) elif resource_type == 'employees': mocker.patch( 'netsuitesdk.api.employees.Employees.get_all_generator', return_value=mock_data ) # Add other resource types return _setup_mock @pytest.mark.parametrize('resource_type,mock_data', [ ('custom_lists', data['get_custom_list']), ('employees', data['get_all_employees']), # Add other combinations ]) def test_accounting_fields_view(resource_type, mock_data, setup_netsuite_mock, ...): setup_netsuite_mock(resource_type, mock_data) # Test implementation
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
apps/internal/actions.py
(1 hunks)apps/internal/urls.py
(1 hunks)apps/internal/views.py
(1 hunks)apps/netsuite/connector.py
(1 hunks)tests/test_internal/test_actions.py
(1 hunks)tests/test_internal/test_views.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.0)
apps/internal/urls.py
1-1: itertools
imported but unused
Remove unused import: itertools
(F401)
apps/netsuite/connector.py
1198-1198: Do not call getattr
with a constant attribute value. It is not any safer than normal property access.
Replace getattr
with attribute access
(B009)
🔇 Additional comments (4)
apps/internal/urls.py (2)
1-1
: Skip commenting on unused import
🧰 Tools
🪛 Ruff (0.8.0)
1-1: itertools
imported but unused
Remove unused import: itertools
(F401)
7-10
: URL patterns look good, verify view implementations
The URL patterns are well-structured and follow Django conventions. However, let's verify the view implementations to ensure they handle the expected functionality.
apps/internal/actions.py (1)
1-5
: LGTM! Clean imports and good structure
The imports are well-organized and the type hints are properly used.
tests/test_internal/test_actions.py (1)
4-42
: Add test cases for error scenarios
The current tests only cover the happy path. Consider adding test cases for:
- Invalid resource_type
- Non-existent internal_id
- API errors from NetSuite
- Missing required query parameters
Let's check if these scenarios are tested elsewhere:
def get_accounting_fields(query_params: Dict): | ||
ns_connection = get_netsuite_connection(query_params) | ||
resource_type = query_params.get('resource_type') | ||
internal_id = query_params.get('internal_id') | ||
|
||
return ns_connection.get_accounting_fields(resource_type, internal_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add parameter validation and error handling
The function needs validation for required parameters and proper error handling.
Implement these improvements:
def get_accounting_fields(query_params: Dict):
+ required_params = ['resource_type', 'internal_id']
+ for param in required_params:
+ if not query_params.get(param):
+ raise ValueError(f'{param} is required in query parameters')
+
ns_connection = get_netsuite_connection(query_params)
resource_type = query_params.get('resource_type')
internal_id = query_params.get('internal_id')
- return ns_connection.get_accounting_fields(resource_type, internal_id)
+ try:
+ return ns_connection.get_accounting_fields(resource_type, internal_id)
+ except Exception as e:
+ raise ValueError(f'Failed to get accounting fields: {str(e)}')
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_accounting_fields(query_params: Dict): | |
ns_connection = get_netsuite_connection(query_params) | |
resource_type = query_params.get('resource_type') | |
internal_id = query_params.get('internal_id') | |
return ns_connection.get_accounting_fields(resource_type, internal_id) | |
def get_accounting_fields(query_params: Dict): | |
required_params = ['resource_type', 'internal_id'] | |
for param in required_params: | |
if not query_params.get(param): | |
raise ValueError(f'{param} is required in query parameters') | |
ns_connection = get_netsuite_connection(query_params) | |
resource_type = query_params.get('resource_type') | |
internal_id = query_params.get('internal_id') | |
try: | |
return ns_connection.get_accounting_fields(resource_type, internal_id) | |
except Exception as e: | |
raise ValueError(f'Failed to get accounting fields: {str(e)}') |
def get_netsuite_connection(query_params: Dict): | ||
org_id = query_params.get('org_id') | ||
|
||
workspace = Workspace.objects.get(fyle_org_id=org_id) | ||
workspace_id = workspace.id | ||
ns_credentials = NetSuiteCredentials.objects.get(workspace_id=workspace.id) | ||
|
||
return NetSuiteConnector(netsuite_credentials=ns_credentials, workspace_id=workspace_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add input validation and error handling
The function has several critical issues that need to be addressed:
- Missing validation for required
org_id
parameter - No error handling for database queries
- Direct use of org_id without validation could pose security risks
Consider implementing these improvements:
def get_netsuite_connection(query_params: Dict):
- org_id = query_params.get('org_id')
+ org_id = query_params.get('org_id')
+ if not org_id:
+ raise ValueError('org_id is required in query parameters')
+
+ try:
+ workspace = Workspace.objects.get(fyle_org_id=org_id)
+ ns_credentials = NetSuiteCredentials.objects.get(workspace_id=workspace.id)
+ except (Workspace.DoesNotExist, NetSuiteCredentials.DoesNotExist) as e:
+ raise ValueError(f'Invalid org_id or missing credentials: {str(e)}')
- workspace = Workspace.objects.get(fyle_org_id=org_id)
workspace_id = workspace.id
- ns_credentials = NetSuiteCredentials.objects.get(workspace_id=workspace.id)
return NetSuiteConnector(netsuite_credentials=ns_credentials, workspace_id=workspace_id)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_netsuite_connection(query_params: Dict): | |
org_id = query_params.get('org_id') | |
workspace = Workspace.objects.get(fyle_org_id=org_id) | |
workspace_id = workspace.id | |
ns_credentials = NetSuiteCredentials.objects.get(workspace_id=workspace.id) | |
return NetSuiteConnector(netsuite_credentials=ns_credentials, workspace_id=workspace_id) | |
def get_netsuite_connection(query_params: Dict): | |
org_id = query_params.get('org_id') | |
if not org_id: | |
raise ValueError('org_id is required in query parameters') | |
try: | |
workspace = Workspace.objects.get(fyle_org_id=org_id) | |
ns_credentials = NetSuiteCredentials.objects.get(workspace_id=workspace.id) | |
except (Workspace.DoesNotExist, NetSuiteCredentials.DoesNotExist) as e: | |
raise ValueError(f'Invalid org_id or missing credentials: {str(e)}') | |
workspace_id = workspace.id | |
return NetSuiteConnector(netsuite_credentials=ns_credentials, workspace_id=workspace_id) |
def get_exported_entry(query_params: Dict): | ||
ns_connection = get_netsuite_connection(query_params) | ||
resource_type = query_params.get('resource_type') | ||
internal_id = query_params.get('internal_id') | ||
|
||
return ns_connection.get_exported_entry(resource_type, internal_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor to reduce code duplication and add validation
This function is nearly identical to get_accounting_fields
. Consider refactoring to reduce duplication.
Here's a suggested implementation:
+def _validate_and_get_params(query_params: Dict, operation: str):
+ required_params = ['resource_type', 'internal_id']
+ for param in required_params:
+ if not query_params.get(param):
+ raise ValueError(f'{param} is required in query parameters')
+
+ ns_connection = get_netsuite_connection(query_params)
+ resource_type = query_params.get('resource_type')
+ internal_id = query_params.get('internal_id')
+ return ns_connection, resource_type, internal_id
+
def get_exported_entry(query_params: Dict):
- ns_connection = get_netsuite_connection(query_params)
- resource_type = query_params.get('resource_type')
- internal_id = query_params.get('internal_id')
+ ns_connection, resource_type, internal_id = _validate_and_get_params(
+ query_params, 'exported_entry'
+ )
- return ns_connection.get_exported_entry(resource_type, internal_id)
+ try:
+ return ns_connection.get_exported_entry(resource_type, internal_id)
+ except Exception as e:
+ raise ValueError(f'Failed to get exported entry: {str(e)}')
Also update get_accounting_fields
to use the new helper function:
def get_accounting_fields(query_params: Dict):
- ns_connection = get_netsuite_connection(query_params)
- resource_type = query_params.get('resource_type')
- internal_id = query_params.get('internal_id')
+ ns_connection, resource_type, internal_id = _validate_and_get_params(
+ query_params, 'accounting_fields'
+ )
- return ns_connection.get_accounting_fields(resource_type, internal_id)
+ try:
+ return ns_connection.get_accounting_fields(resource_type, internal_id)
+ except Exception as e:
+ raise ValueError(f'Failed to get accounting fields: {str(e)}')
Committable suggestion skipped: line range outside the PR's diff.
def test_get_accounting_fields(db, mocker): | ||
query_params = { | ||
'org_id': 'or79Cob97KSh', | ||
'resource_type': 'employees', | ||
} | ||
mocker.patch( | ||
'netsuitesdk.api.employees.Employees.get_all_generator', | ||
return_value=data['get_all_employees'] | ||
) | ||
|
||
mocker.patch('netsuitesdk.api.currencies.Currencies.get_all') | ||
|
||
mocker.patch( | ||
'netsuitesdk.api.custom_lists.CustomLists.get', | ||
return_value=data['get_custom_list'] | ||
) | ||
|
||
fields = get_accounting_fields(query_params) | ||
assert fields is not None | ||
|
||
query_params['resource_type'] = 'custom_lists' | ||
query_params['internal_id'] = '1' | ||
fields = get_accounting_fields(query_params) | ||
assert fields is not None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add docstring and improve test assertions
The test function would benefit from:
- A docstring explaining the test scenarios
- Assertions on the actual content of returned fields
- Using a fixture for org_id instead of hardcoding
def test_get_accounting_fields(db, mocker):
+ """
+ Test get_accounting_fields action with different resource types:
+ 1. Test with employees resource type
+ 2. Test with custom_lists resource type
+ """
query_params = {
- 'org_id': 'or79Cob97KSh',
+ 'org_id': pytest.fixture('org_id'),
'resource_type': 'employees',
}
Committable suggestion skipped: line range outside the PR's diff.
return_value=data['get_all_employees'] | ||
) | ||
|
||
mocker.patch('netsuitesdk.api.currencies.Currencies.get_all') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix empty mock for Currencies.get_all
The mock for Currencies.get_all is missing a return value, which could lead to unexpected behavior.
- mocker.patch('netsuitesdk.api.currencies.Currencies.get_all')
+ mocker.patch(
+ 'netsuitesdk.api.currencies.Currencies.get_all',
+ return_value=data['get_all_currencies']
+ )
Committable suggestion skipped: line range outside the PR's diff.
) | ||
|
||
except Exception: | ||
logger.info(f"Error in AccountingFieldsView: {traceback.format_exc()}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect logger message
The error message incorrectly references "AccountingFieldsView" instead of "ExportedEntryView".
- logger.info(f"Error in AccountingFieldsView: {traceback.format_exc()}")
+ logger.info(f"Error in ExportedEntryView: {traceback.format_exc()}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
logger.info(f"Error in AccountingFieldsView: {traceback.format_exc()}") | |
logger.info(f"Error in ExportedEntryView: {traceback.format_exc()}") |
authentication_classes = [] | ||
permission_classes = [IsAuthenticatedForInternalAPI] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix contradictory authentication configuration
The empty authentication_classes = []
with IsAuthenticatedForInternalAPI
permission is contradictory. Without an authentication class, there's no way to authenticate the request, making the permission check ineffective.
class AccountingFieldsView(generics.GenericAPIView):
- authentication_classes = []
+ authentication_classes = [TokenAuthentication] # or appropriate auth class
permission_classes = [IsAuthenticatedForInternalAPI]
Committable suggestion skipped: line range outside the PR's diff.
params = request.query_params | ||
assert_valid(params.get('org_id') is not None, 'Org ID is required') | ||
assert_valid(params.get('resource_type') is not None, 'Resource Type is required') | ||
assert_valid(params.get('internal_id') is not None, 'Internal ID is required') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add resource type validation and refactor validation logic
- The
resource_type
parameter should be validated against allowed values - The validation logic is duplicated between views and could be refactored into a reusable function
+VALID_RESOURCE_TYPES = {'custom_segments', 'custom_lists', 'custom_record_types', ...} # add all valid types
+
+def validate_params(params, require_internal_id=True):
+ assert_valid(params.get('org_id') is not None, 'Org ID is required')
+ assert_valid(params.get('resource_type') is not None, 'Resource Type is required')
+ assert_valid(
+ params.get('resource_type') in VALID_RESOURCE_TYPES,
+ f'Invalid resource type. Must be one of: {", ".join(VALID_RESOURCE_TYPES)}'
+ )
+ if require_internal_id:
+ assert_valid(params.get('internal_id') is not None, 'Internal ID is required')
+
def get(self, request, *args, **kwargs):
try:
params = request.query_params
- assert_valid(params.get('org_id') is not None, 'Org ID is required')
- assert_valid(params.get('resource_type') is not None, 'Resource Type is required')
- assert_valid(params.get('internal_id') is not None, 'Internal ID is required')
+ validate_params(params)
Committable suggestion skipped: line range outside the PR's diff.
def get_exported_entry(self, resource_type: str, export_id: str): | ||
module = getattr(self.connection, resource_type) | ||
response = getattr(module, 'get')(export_id) | ||
return json.loads(json.dumps(response, default=str)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling and improve type safety.
The method needs several improvements for robustness and type safety:
- Add error handling for invalid resource types
- Add input validation for export_id
- Add type hints
- Replace getattr with direct attribute access
Here's the suggested implementation:
- def get_exported_entry(self, resource_type: str, export_id: str):
+ def get_exported_entry(self, resource_type: str, export_id: str) -> Dict:
+ """
+ Retrieve a specific exported entry.
+
+ Args:
+ resource_type (str): The type of resource to fetch.
+ export_id (str): The export ID of the entry.
+
+ Returns:
+ Dict: Parsed JSON representation of the exported entry.
+
+ Raises:
+ ValueError: If resource_type is invalid or export_id is empty.
+ """
+ if not resource_type:
+ raise ValueError('Resource type is required')
+ if not export_id:
+ raise ValueError('Export ID is required')
+
module = getattr(self.connection, resource_type)
- response = getattr(module, 'get')(export_id)
+ response = module.get(export_id)
return json.loads(json.dumps(response, default=str))
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.8.0)
1198-1198: Do not call getattr
with a constant attribute value. It is not any safer than normal property access.
Replace getattr
with attribute access
(B009)
def get_accounting_fields(self, resource_type: str, internal_id: str): | ||
""" | ||
Retrieve accounting fields for a specific resource type and internal ID. | ||
|
||
Args: | ||
resource_type (str): The type of resource to fetch. | ||
internal_id (str): The internal ID of the resource. | ||
|
||
Returns: | ||
list or dict: Parsed JSON representation of the resource data. | ||
""" | ||
module = getattr(self.connection, resource_type) | ||
method_map = { | ||
'currencies': 'get_all', | ||
'custom_segments': 'get', | ||
'custom_lists': 'get', | ||
'custom_record_types': 'get_all_by_id', | ||
} | ||
method = method_map.get(resource_type, 'get_all_generator') | ||
|
||
if method in ('get', 'get_all_by_id'): | ||
response = getattr(module, method)(internal_id) | ||
else: | ||
response = getattr(module, method)() | ||
|
||
if method == 'get_all_generator': | ||
response = [row for responses in response for row in responses] | ||
|
||
return json.loads(json.dumps(response, default=str)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling and improve type safety.
The method needs several improvements for robustness and type safety:
- Add error handling for invalid resource types
- Add input validation for internal_id
- Add return type hint
- Replace getattr with direct attribute access for module
Here's the suggested implementation:
- def get_accounting_fields(self, resource_type: str, internal_id: str):
+ def get_accounting_fields(self, resource_type: str, internal_id: str) -> Union[List, Dict]:
+ """
+ Retrieve accounting fields for a specific resource type and internal ID.
+
+ Args:
+ resource_type (str): The type of resource to fetch.
+ internal_id (str): The internal ID of the resource.
+
+ Returns:
+ Union[List, Dict]: Parsed JSON representation of the resource data.
+
+ Raises:
+ ValueError: If resource_type is invalid or internal_id is empty.
+ """
+ if not resource_type:
+ raise ValueError('Resource type is required')
+ if not internal_id:
+ raise ValueError('Internal ID is required')
+
module = getattr(self.connection, resource_type)
method_map = {
'currencies': 'get_all',
'custom_segments': 'get',
'custom_lists': 'get',
'custom_record_types': 'get_all_by_id',
}
- method = method_map.get(resource_type, 'get_all_generator')
+ method_name = method_map.get(resource_type, 'get_all_generator')
+ method = getattr(module, method_name)
- if method in ('get', 'get_all_by_id'):
- response = getattr(module, method)(internal_id)
+ if method_name in ('get', 'get_all_by_id'):
+ response = method(internal_id)
else:
- response = getattr(module, method)()
+ response = method()
- if method == 'get_all_generator':
+ if method_name == 'get_all_generator':
response = [row for responses in response for row in responses]
return json.loads(json.dumps(response, default=str))
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def get_accounting_fields(self, resource_type: str, internal_id: str): | |
""" | |
Retrieve accounting fields for a specific resource type and internal ID. | |
Args: | |
resource_type (str): The type of resource to fetch. | |
internal_id (str): The internal ID of the resource. | |
Returns: | |
list or dict: Parsed JSON representation of the resource data. | |
""" | |
module = getattr(self.connection, resource_type) | |
method_map = { | |
'currencies': 'get_all', | |
'custom_segments': 'get', | |
'custom_lists': 'get', | |
'custom_record_types': 'get_all_by_id', | |
} | |
method = method_map.get(resource_type, 'get_all_generator') | |
if method in ('get', 'get_all_by_id'): | |
response = getattr(module, method)(internal_id) | |
else: | |
response = getattr(module, method)() | |
if method == 'get_all_generator': | |
response = [row for responses in response for row in responses] | |
return json.loads(json.dumps(response, default=str)) | |
def get_accounting_fields(self, resource_type: str, internal_id: str) -> Union[List, Dict]: | |
""" | |
Retrieve accounting fields for a specific resource type and internal ID. | |
Args: | |
resource_type (str): The type of resource to fetch. | |
internal_id (str): The internal ID of the resource. | |
Returns: | |
Union[List, Dict]: Parsed JSON representation of the resource data. | |
Raises: | |
ValueError: If resource_type is invalid or internal_id is empty. | |
""" | |
if not resource_type: | |
raise ValueError('Resource type is required') | |
if not internal_id: | |
raise ValueError('Internal ID is required') | |
module = getattr(self.connection, resource_type) | |
method_map = { | |
'currencies': 'get_all', | |
'custom_segments': 'get', | |
'custom_lists': 'get', | |
'custom_record_types': 'get_all_by_id', | |
} | |
method_name = method_map.get(resource_type, 'get_all_generator') | |
method = getattr(module, method_name) | |
if method_name in ('get', 'get_all_by_id'): | |
response = method(internal_id) | |
else: | |
response = method() | |
if method_name == 'get_all_generator': | |
response = [row for responses in response for row in responses] | |
return json.loads(json.dumps(response, default=str)) |
Description
Please add PR description here, add screenshots if needed
Clickup
https://app.clickup.com/
Summary by CodeRabbit
Release Notes
New Features
accounting_fields/
andexported_entry/
.Bug Fixes
Chores