"""
Created on Apr 2, 2015
"""
import os
import re
import sys
from pathlib import Path
from typing import *
from openpyxl import load_workbook, Workbook
from openpyxl.utils import get_column_letter
from openpyxl.worksheet.table import Table as WorksheetTable, TableStyleInfo
from bi_etl.components.row.row import Row
from bi_etl.components.xlsx_reader import XLSXReader
from bi_etl.scheduler.task import ETLTask
from bi_etl.statistics import Statistics
__all__ = ['XLSXWriter']
[docs]
class XLSXWriter(XLSXReader):
"""
XLSXWriter will write rows to a Microsoft Excel XLSX formatted workbook.
Parameters
----------
task: ETLTask
The instance to register in (if not None)
file_name: str
The file_name to parse as xlsx.
logical_name: str
The logical name of this source. Used for log messages.
write_only: bool
Should we use the faster write only mode ?
(only supports brand-new files not adding data to an existing file)
Attributes
----------
column_names: list
The names to use for columns
header_row: int
The sheet row to read headers from. Default = 1.
start_row: int
The first row to parse for data. Default = header_row + 1
workbook: :class:`openpyxl.workbook.workbook.Workbook`
The workbook that was opened.
log_first_row : boolean
Should we log progress on the first row read. *Only applies if used as a source.*
(inherited from ETLComponent)
max_rows : int, optional
The maximum number of rows to read. *Only applies if Table is used as a source.*
(inherited from ETLComponent)
primary_key: list
The name of the primary key column(s). Only impacts trace messages. Default=None.
(inherited from ETLComponent)
progress_frequency: int
How often (in seconds) to output progress messages. None for no progress messages.
(inherited from ETLComponent)
progress_message: str
The progress message to print. Default is ``"{logical_name} row # {row_number}"``.
Note ``logical_name`` and ``row_number`` subs.
(inherited from ETLComponent)
restkey: str
Column name to catch extra long rows (more columns than we have column names)
when reading values (extra values).
restval: str
The value to put in columns that are in the column_names but
not present in a given row when reading (missing values).
"""
[docs]
def __init__(self,
task: Optional[ETLTask],
file_name: Union[str, Path],
logical_name: str = None,
write_only: bool = True,
**kwargs
):
super().__init__(
task=task,
file_name=file_name,
logical_name=logical_name,
)
self.headers_written = False
self.write_only = write_only
self._insert_cnt = 0
self._insert_cnt_this_sheet = 0
# Matches invalid XML1.0 unicode characters, like control characters:
# http://www.w3.org/TR/2006/REC-xml-20060816/#charsets
# http://stackoverflow.com/questions/1707890/fast-way-to-filter-illegal-xml-unicode-chars-in-python
_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F),
(0x7F, 0x84), (0x86, 0x9F),
(0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)]
if sys.maxunicode >= 0x10000: # not narrow build
_illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF),
(0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF),
(0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
(0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF),
(0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF),
(0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
(0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF),
(0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)])
_illegal_ranges = ["%s-%s" % (chr(low), chr(high))
for (low, high) in _illegal_unichrs]
self._illegal_xml_chars_RE = re.compile(u'[%s]' % u''.join(_illegal_ranges))
# Should be the last call of every init
self.set_kwattrs(**kwargs)
@property
def __repr__(self):
return f"XLSXWriter({self.logical_name})"
@property
def rows_inserted(self):
return self._insert_cnt
@property
def rows_inserted_this_sheet(self):
return self._insert_cnt_this_sheet
@property
def workbook(self):
if self._workbook is None:
if self.write_only:
self._workbook = Workbook(write_only=True)
else:
if os.path.isfile(self.file_name):
self._workbook = load_workbook(filename=self.file_name, read_only=False)
else:
self._workbook = Workbook()
del self._workbook['Sheet']
return self._workbook
[docs]
def set_active_worksheet_by_name(self, sheet_name):
self._active_worksheet_name = sheet_name
if sheet_name not in self.workbook:
self._active_worksheet = self.workbook.create_sheet(sheet_name)
self._insert_cnt_this_sheet = 0
if self.headers_written:
self._column_names = None
self.headers_written = False
else:
self._active_worksheet = self.workbook[sheet_name]
self._column_names = None
self._insert_cnt_this_sheet = 0
[docs]
def set_active_worksheet_by_number(self, sheet_number: int):
"""
Change to an existing worksheet based on the sheet number.
"""
try:
super().set_active_worksheet_by_number(sheet_number)
except ValueError as e:
raise ValueError(f"{e}. Use set_active_worksheet_by_name to create new sheets.")
def _obtain_column_names(self):
raise ValueError(f'Column names must be explicitly set on {self}')
[docs]
def set_columns_and_widths(self, columns_dict: Dict[str, float]):
"""
Set the column names and widths at the same time.
See :py:meth:`set_widths`.
"""
self.column_names = columns_dict.keys()
self.set_widths(columns_dict.values())
[docs]
def insert_row(
self,
source_row: Row, # Must be a single row
additional_insert_values: dict = None,
stat_name: str = 'insert',
parent_stats: Statistics = None,
) -> Row:
"""
Inserts a row into the database (batching rows as batch_size)
Parameters
----------
source_row:
The row with values to insert
additional_insert_values:
Values to add / override in the row before inserting.
stat_name:
Name of this step for the ETLTask statistics.
parent_stats:
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
Returns
-------
new_row
"""
stats = self.get_stats_entry(stat_name, parent_stats=parent_stats)
stats.timer.start()
# row_values = [source_row[col] for col in self.column_names]
if not self.headers_written:
self.write_header()
assert len(source_row) > 0, f"Empty row passed into {self} insert"
values = []
for column_name in self.column_names:
if additional_insert_values and column_name in additional_insert_values:
col_value = additional_insert_values[column_name]
else:
try:
col_value = source_row[column_name]
except KeyError:
col_value = None
if isinstance(col_value, str):
col_value = self._illegal_xml_chars_RE.sub('\\?', col_value)
values.append(col_value)
new_row = self.row_object(iteration_header=self.full_iteration_header)
new_row.update_from_values(values)
assert len(values) > 0, f"No values with column names from {self} found in {source_row}"
self.active_worksheet.append(values)
self._insert_cnt += 1
self._insert_cnt_this_sheet += 1
stats.timer.stop()
return new_row
[docs]
def set_widths(self, column_widths: Iterable[float]):
"""
Set the column widths in the xlsx for the currently active worksheet.
"""
for i, column_width in enumerate(column_widths):
self.active_worksheet.column_dimensions[get_column_letter(i + 1)].width = column_width
[docs]
def make_table_from_inserted_data(self, style_name: str = 'TableStyleMedium2'):
"""
Format the newly inserted worksheet data as a table.
Parameters
----------
style_name:
The name of the Excel style to apply to the table.
"""
if self._insert_cnt_this_sheet == 0:
return
last_col_letter = get_column_letter(len(self.column_names))
table_ref = f"A1:{last_col_letter}{self._insert_cnt_this_sheet + 1}"
self.log.debug(f"Adding table to worksheet {self.active_worksheet_name} for range {table_ref}")
table = WorksheetTable(
displayName=f"Tbl_{self.active_worksheet_name}",
ref=table_ref
)
style = TableStyleInfo(
name=style_name,
showFirstColumn=False,
showLastColumn=False,
showRowStripes=True,
showColumnStripes=False
)
table.tableStyleInfo = style
# noinspection PyProtectedMember
table._initialise_columns()
for column, value in zip(table.tableColumns, self.column_names):
column.name = value
self.active_worksheet.add_table(table)
[docs]
def insert(
self,
source_row: Union[Row, list], # Could also be a whole list of rows
parent_stats: Statistics = None,
**kwargs
):
"""
Insert a row or list of rows in the table.
Parameters
----------
source_row:
Row(s) to insert
parent_stats:
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
"""
if isinstance(source_row, list):
for row in source_row:
self.insert_row(
row,
parent_stats=parent_stats,
**kwargs
)
else:
self.insert_row(
source_row,
parent_stats=parent_stats,
**kwargs
)
[docs]
def close(self, error: bool = False):
"""
Close the xlsx file, saving first if ``error`` is false.
Parameters
----------
error:
Did we run into an error during processing?
Errors cause a rollback, which skips the save of the file.
"""
if self.has_workbook_init():
if not error:
self.workbook.save(filename=self.file_name)
else:
self.log.info(f"{self}.close not saving due to error")
self.workbook.close()
super().close(error=error)