"""
Created on Apr 2, 2015
"""
import io
import os
from typing import *
from datetime import datetime, time
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from bi_etl.components.etlcomponent import ETLComponent
from bi_etl.scheduler.task import ETLTask
__all__ = ['XLSXReader']
[docs]
class XLSXReader(ETLComponent):
"""
XLSXReader will read rows from a Microsoft Excel XLSX formatted workbook.
Parameters
----------
task: ETLTask
The instance to register in (if not None)
file_name: str
The file_name to parse as xlsx.
logical_name: str
The logical name of this source. Used for log messages.
Attributes
----------
column_names: list
The names to use for columns
header_row: int
The sheet row to read headers from. Default = 1.
start_row: int
The first row to parse for data. Default = header_row + 1
workbook: :class:`openpyxl.workbook.workbook.Workbook`
The workbook that was opened.
log_first_row : boolean
Should we log progress on the first row read. *Only applies if used as a source.*
(inherited from ETLComponent)
max_rows : int, optional
The maximum number of rows to read. *Only applies if Table is used as a source.*
(inherited from ETLComponent)
primary_key: list
The name of the primary key column(s). Only impacts trace messages. Default=None.
(inherited from ETLComponent)
progress_frequency: int
How often (in seconds) to output progress messages. None for no progress messages.
(inherited from ETLComponent)
progress_message: str
The progress message to print. Default is ``"{logical_name} row # {row_number}"``.
Note ``logical_name`` and ``row_number`` subs.
(inherited from ETLComponent)
restkey: str
Column name to catch extra long rows (more columns than we have column names)
when reading values (extra values).
restval: str
The value to put in columns that are in the column_names but
not present in a given row when reading (missing values).
"""
[docs]
def __init__(self,
task: Optional[ETLTask],
file_name: Union[str, Path],
logical_name: Optional[str] = None,
**kwargs
):
self.file_name = file_name
if logical_name is None:
try:
logical_name = os.path.basename(self.file_name)
except (AttributeError, TypeError):
logical_name = str(self.file_name)
# Don't pass kwargs up. They should be set here at the end
super().__init__(
task=task,
logical_name=logical_name,
)
# column to catch long rows (more values than columns)
self.restkey = 'extra data past last delimiter'
# default value for short rows (value for missing keys)
self.restval = None
self.__header_row = 1
self.__start_row = None
self.__active_row = None
self._workbook = None
self._active_worksheet: Optional[Worksheet] = None
self._active_worksheet_name: Optional[str] = None
# Should be the last call of every init
self.set_kwattrs(**kwargs)
def __repr__(self):
return f"XLSXReader({self.logical_name})"
@property
def header_row(self) -> int:
"""
The sheet row to read headers from. Default = 1.
"""
return self.__header_row
@header_row.setter
def header_row(self, value: int):
self.__header_row = value
@property
def start_row(self) -> int:
"""
The sheet row to start reading data from.
Default = header_row + 1
"""
if self.__start_row is not None:
return self.__start_row
else:
return self.header_row + 1
@start_row.setter
def start_row(self, value: int):
self.__start_row = value
[docs]
def has_workbook_init(self) -> bool:
return self._workbook is not None
@property
def workbook(self):
if self._workbook is None:
# Work around for openpyxl close not working correctly
with open(self.file_name, "rb") as f:
in_mem_file = io.BytesIO(f.read())
self._workbook = load_workbook(filename=in_mem_file, read_only=True)
# Original open code if the openpyxl libary can properly close the file
# self._workbook = load_workbook(filename=self.file_name, read_only=True)
return self._workbook
[docs]
def set_active_worksheet_by_name(self, sheet_name: str):
self._active_worksheet = self.workbook[sheet_name]
self._active_worksheet_name = sheet_name
self._column_names = None
self._full_iteration_header = None
[docs]
def set_active_worksheet_by_number(self, sheet_number: int):
"""
Change to an existing worksheet based on the sheet number.
"""
sheet_names = self.get_sheet_names()
if len(sheet_names) >= (sheet_number + 1):
sheet_name = sheet_names[sheet_number]
else:
raise ValueError(f"{self} does not have a worksheet numbered {sheet_number}")
self.set_active_worksheet_by_name(sheet_name)
@property
def active_worksheet(self):
if self._active_worksheet is None:
self.set_active_worksheet_by_number(0)
return self._active_worksheet
@property
def active_worksheet_name(self) -> str:
# if self._active_worksheet_name is None:
# self.set_active_worksheet_by_number(0)
# return self._active_worksheet_name
return self.active_worksheet.title
[docs]
def get_sheet_names(self):
return self.workbook.sheetnames
[docs]
def get_sheet_by_name(self, name):
"""Returns a worksheet by its name.
Parameters
----------
name: str
The name of the worksheet to look for
Returns
-------
openpyxl.worksheet.worksheet.Worksheet
Worksheet object, or None if no worksheet has the name specified.
"""
try:
return self.workbook[name]
except KeyError:
return
@property
def line_num(self):
"""
The current line number in the source file.
line_num differs from rows_read in that rows_read deals with rows that would be returned to the caller
"""
return self.__active_row
def _obtain_column_names(self):
try:
row = self.read_header_row()
self._column_names = row
if self.trace_data:
self.log.debug(f"Column names read: {self._column_names}")
except StopIteration:
pass
@staticmethod
def _get_cell_value(cell):
value = cell.value
if hasattr(value, 'strip'):
value = value.strip()
if value == '':
value = None
elif isinstance(value, datetime):
# Excel time values of 12:00:00 AM come in as 1899-12-30 instead
if value == datetime(1899, 12, 30):
value = time(12, 0, 0)
return value
@staticmethod
def _get_cell_values(row_cells) -> list:
# Convert empty strings to None to be consistent with DB reads
return list(map(XLSXReader._get_cell_value, row_cells))
def _raw_rows(self):
# See https://openpyxl.readthedocs.org/en/latest/tutorial.html
self.__active_row = self.start_row
len_column_names = len(self.column_names)
this_iteration_header = self.full_iteration_header
for row in self.active_worksheet.iter_rows(min_row=self.start_row):
if len(row) > 0:
self.__active_row = row[0].row
else:
self.__active_row += 1
row_values = XLSXReader._get_cell_values(row)
d = self.Row(data=row_values[:len_column_names], iteration_header=this_iteration_header)
len_column_names = len(self.column_names)
len_row = len(row_values)
if len_column_names < len_row:
if self.restkey is not None:
# Note: Adding restkey to the row will create a new iteration header
# (shared by subsequent rows with extra values)
d[self.restkey] = row_values[len_column_names:]
elif len_column_names > len_row:
# This could be done in a faster way, but hopefully is rare so not worth optimizing
for key in self.column_names[len_row]:
d[key] = self.restval
yield d
[docs]
def close(self, error: bool = False):
if not self.is_closed:
if self._workbook is not None:
self._workbook.close()
self._workbook = None
super(XLSXReader, self).close(error=error)