Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ability to define multiple line definitions #225

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 58 additions & 58 deletions src/invoice2data/extract/plugins/lines.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,76 +12,76 @@

def extract(self, content, output):
"""Try to extract lines from the invoice"""
lines = []
for line in self['lines']:
# First apply default options.
plugin_settings = DEFAULT_OPTIONS.copy()
plugin_settings.update(line)
line = plugin_settings

# First apply default options.
plugin_settings = DEFAULT_OPTIONS.copy()
plugin_settings.update(self['lines'])
self['lines'] = plugin_settings

# Validate settings
assert 'start' in self['lines'], 'Lines start regex missing'
assert 'end' in self['lines'], 'Lines end regex missing'
assert 'line' in self['lines'], 'Line regex missing'
# Validate settings
assert 'start' in line, 'Lines start regex missing'
assert 'end' in line, 'Lines end regex missing'
assert 'line' in line, 'Line regex missing'

start = re.search(self['lines']['start'], content)
end = re.search(self['lines']['end'], content)
if not start or not end:
logger.warning('no lines found - start %s, end %s', start, end)
return
content = content[start.end(): end.start()]
lines = []
current_row = {}
if 'first_line' not in self['lines'] and 'last_line' not in self['lines']:
self['lines']['first_line'] = self['lines']['line']
for line in re.split(self['lines']['line_separator'], content):
# if the line has empty lines in it , skip them
if not line.strip('').strip('\n') or not line:
continue
if 'first_line' in self['lines']:
match = re.search(self['lines']['first_line'], line)
if match:
if 'last_line' not in self['lines']:
start = re.search(line['start'], content)
end = re.search(line['end'], content)
if not start or not end:
logger.warning('no lines found - start %s, end %s', start, end)
return
content_section = content[start.end(): end.start()]
current_row = {}
if 'first_line' not in line and 'last_line' not in line:
line['first_line'] = line['line']
for line_content in re.split(line['line_separator'], content_section):
# if the line has empty lines in it , skip them
if not line_content.strip('').strip('\n') or not line_content:
continue
if 'first_line' in line:
match = re.search(line['first_line'], line_content)
if match:
if 'last_line' not in line:
if current_row:
lines.append(current_row)
current_row = {}
if current_row:
lines.append(current_row)
current_row = {
field: value.strip() if value else ''
for field, value in match.groupdict().items()
}
continue
if 'last_line' in line:
match = re.search(line['last_line'], line_content)
if match:
for field, value in match.groupdict().items():
current_row[field] = '%s%s%s' % (
current_row.get(field, ''),
current_row.get(field, '') and '\n' or '',
value.strip() if value else '',
)
if current_row:
lines.append(current_row)
current_row = {}
if current_row:
lines.append(current_row)
current_row = {
field: value.strip() if value else ''
for field, value in match.groupdict().items()
}
continue
if 'last_line' in self['lines']:
match = re.search(self['lines']['last_line'], line)
continue
match = re.search(line['line'], line_content)
if match:
for field, value in match.groupdict().items():
current_row[field] = '%s%s%s' % (
current_row.get(field, ''),
current_row.get(field, '') and '\n' or '',
value.strip() if value else '',
)
if current_row:
lines.append(current_row)
current_row = {}
continue
match = re.search(self['lines']['line'], line)
if match:
for field, value in match.groupdict().items():
current_row[field] = '%s%s%s' % (
current_row.get(field, ''),
current_row.get(field, '') and '\n' or '',
value.strip() if value else '',
)
continue
logger.debug('ignoring *%s* because it doesn\'t match anything', line)
if current_row:
lines.append(current_row)
logger.debug('ignoring *%s* because it doesn\'t match anything', line_content)
if current_row:
lines.append(current_row)

types = self['lines'].get('types', [])
for row in lines:
for name in row.keys():
if name in types:
row[name] = self.coerce_type(row[name], types[name])
types = line.get('types', [])
for row in lines:
for name in row.keys():
if name in types:
row[name] = self.coerce_type(row[name], types[name])

if lines:
output['lines'] = lines
if lines:
output['lines'] = lines