import sys
import pandas
import re
import xmlmanip
from tangier_api.api import ScheduleConnection
from tangier_api.api import ProviderConnection
from tangier_api.api import LocationConnection
from tangier_api import helpers
from tangier_api import exceptions
[docs]class ScheduleManipulation(ScheduleConnection):
[docs] def save_schedule_from_range(self, start_date=None, end_date=None, site_ids=None, xml_string="", **tags):
"""
Saves schedule for indicated date range and facilities to ScheduleConnection object
:param start_date: (str) %Y-%m-%d date string indicating the beginning of the range from which to pull the schedule
:param end_date: (str) %Y-%m-%d date string indicating the ending of the range from which to pull the schedule
:param site_ids: (list or None) list of ids corresponding to the site(s) that the schedule will be pulled from, defaults to the list pulled from site_file in the __init__ function
:param xml_string: (xml string) overrides the default credential and/or schedule injection into base_xml
:param tags: (kwargs) things to be injected into the request.
:return:
"""
schedule_values_list = []
ranges = helpers.date_ranges(start_date, end_date)
for date_range in ranges:
print(str(date_range))
schedule_values_list.extend(
self.get_schedule_values_list(date_range[0], date_range[1], site_ids, xml_string, **tags))
df = pandas.DataFrame(schedule_values_list)
if df.empty:
raise exceptions.APICallError('No schedule was returned in the given range.')
df = df.sort_values(['shift_start_date', 'shift_end_date']).reset_index()
df = df.drop(['index'], axis=1)
self.saved_schedule = df.copy()
[docs] def get_schedule_open(self, info=False):
"""
Gets DataFrame of all entries from schedule where providername == "open" in the saved_schedule
:param info: (bool) whether or not to print out progress
:return: (DataFrame) of all entries from schedule which were not worked (reportedminutes == 0)
"""
if self.saved_schedule is None:
raise exceptions.APICallError('There must be a saved schedule from save_schedule_from_range.')
df = self.saved_schedule.copy()
open_df = df[df['providername'] == 'open']
return open_df
[docs] def get_schedule_empties(self, info=False):
"""
Gets DataFrame of all entries from schedule which were not worked (reportedminutes == 0) in the saved_schedule
:param info: (bool) whether or not to print out progress
:return: (DataFrame) of all entries from schedule which were not worked (reportedminutes == 0)
"""
if self.saved_schedule is None:
raise exceptions.APICallError('There must be a saved schedule from save_schedule_from_range.')
df = self.saved_schedule.copy()
empties = df[df['reportedminutes'] == '0']
return empties
[docs] def get_schedule_conflicts(self, info=False):
"""
Gets DataFrame of all entries where an employee worked a double-booked shift in the saved_schedule
:param info: (bool) whether or not to print out progress
:return: (DataFrame) of all entries where an employee worked a double-booked shift
"""
if self.saved_schedule is None:
raise exceptions.APICallError('There must be a saved schedule from save_schedule_from_range.')
df = self.saved_schedule.copy()
if not 'provider_primary_key' in df.columns:
raise exceptions.APICallError('get_schedule_conflicts, and get_schedule_duplicates '
'rely on use of provider_primary_key=True.')
df = df.sort_values(['shift_start_date', 'shift_end_date'])
conflict_df = pandas.DataFrame()
unique_ids = list(df['provider_primary_key'].dropna().unique())
for c, emp_id in enumerate(unique_ids):
if (c % 13 == 12 or c == len(unique_ids) - 1) and info:
print(f'{(c+1)/len(unique_ids)*100:>5.2f}%')
elif info:
print(f'{(c+1)/len(unique_ids)*100:>5.2f}%', end=', ')
emp_sched = df.loc[df['provider_primary_key'] == emp_id]
for i, row in emp_sched.iterrows():
for j, row2 in emp_sched.iterrows():
if j <= i:
continue
elif row2['shift_start_date'] > row['shift_end_date']:
break
if ((row['shift_start_date'] < row2['shift_end_date']) and (
row['shift_end_date'] > row2['shift_start_date'])):
row['conflict_shift_start_date'], row['conflict_shift_end_date'] = row2['shift_start_date'], \
row2['shift_end_date']
row['conflict_index'] = j
conflict_df = conflict_df.append(
row[['conflict_index', 'provider_primary_key', 'shift_start_date',
'shift_end_date', 'conflict_shift_start_date',
'conflict_shift_end_date']])
if not conflict_df.empty:
conflict_df['conflict_index'] = conflict_df['conflict_index'].astype(int)
return conflict_df
[docs] def get_schedule_duplicates(self, info=False):
"""
Gets DataFrame of all duplicate entries in the saved_schedule
:param info: (bool) whether or not to print out progress
:return: (DataFrame) of all duplicate entries
"""
if self.saved_schedule is None:
raise exceptions.APICallError('There must be a saved schedule from save_schedule_from_range.')
df = self.saved_schedule.copy()
if not 'provider_primary_key' in df.columns:
raise exceptions.APICallError('get_schedule_conflicts, and get_schedule_duplicates '
'rely on use of provider_primary_key=True.')
dupe_df = pandas.DataFrame()
unique_ids = list(df['provider_primary_key'].dropna().unique())
for c, emp_id in enumerate(unique_ids):
if (c % 13 == 12 or c == len(unique_ids) - 1) and info:
print(f'{(c+1)/len(unique_ids)*100:>5.2f}%')
elif info:
print(f'{(c+1)/len(unique_ids)*100:>5.2f}%', end=', ')
emp_sched = df.loc[df['provider_primary_key'] == emp_id]
for i, row in emp_sched.iterrows():
for j, row2 in emp_sched.iterrows():
if j <= i:
continue
elif row2['shift_start_date'] > row['shift_end_date']:
break
if ((row['shift_start_date'] == row2['shift_start_date']) and (
row['shift_end_date'] == row2['shift_end_date'])):
row['dupe_shift_start_date'], row['dupe_shift_end_date'] = row2['shift_start_date'], row2[
'shift_end_date']
row['dupe_index'] = j
dupe_df = dupe_df.append(
row[['dupe_index', 'provider_primary_key', 'shift_start_date', 'shift_end_date',
'dupe_shift_start_date', 'dupe_shift_end_date']])
if not dupe_df.empty:
dupe_df['dupe_index'] = dupe_df['dupe_index'].astype(int)
return dupe_df
[docs] def generate_duplicates_report(self, dupes):
dupes = dupes.reset_index()
# dupes_left will have originals, dupes_right will have duplicates of originals
if not 'index' in dupes.columns or not 'dupe_index' in dupes.columns:
return pandas.DataFrame()
dupes_left = self.saved_schedule.loc[dupes['index']].reset_index()
dupes_right = self.saved_schedule.loc[dupes['dupe_index']].reset_index()
# we append and sort on the two indices, the final result has alternating rows of orignals and duplicates
dupes_append = dupes_left.append(dupes_right).reset_index().sort_values(['level_0', 'index'])
dupes_append = dupes_append.set_index(['level_0'])
return dupes_append
[docs] def generate_conflicts_report(self, conflicts):
conflicts = conflicts.reset_index()
conflicts_left = self.saved_schedule.loc[conflicts['index']].reset_index()
if not 'index' in conflicts.columns or not 'conflict_index' in conflicts.columns:
return pandas.DataFrame()
conflicts_right = self.saved_schedule.loc[conflicts['conflict_index']].reset_index()
conflicts_append = conflicts_left.append(conflicts_right).reset_index().sort_values(['level_0', 'index'])
conflicts_append = conflicts_append.set_index(['level_0'])
return conflicts_append
[docs] def remove_schedule_open(self):
"""
Removes all entries from schedule which are just open shifts (providername == 'open') in the saved_schedule
:return:
"""
initial_length = self.saved_schedule.shape[0]
open_df = self.get_schedule_open().reset_index()
if open_df.empty:
print('No open shifts to remove.')
return
rows_to_remove = open_df.shape[0]
temp_df = self.saved_schedule.drop(open_df['index'])
if temp_df.shape[0] == initial_length - rows_to_remove:
self.saved_schedule = temp_df
else:
raise exceptions.APIError(
'An unexpected number of entries were removed; this indicates an issue with the saved schedule.')
print(f'Removed {rows_to_remove} open shifts.')
[docs] def remove_schedule_empties(self):
"""
Removes all entries from schedule which were not worked (reportedminutes == 0) in the saved_schedule
:return:
"""
initial_length = self.saved_schedule.shape[0]
empty_df = self.get_schedule_empties().reset_index()
if empty_df.empty:
print('No empties to remove.')
return
rows_to_remove = empty_df.shape[0]
temp_df = self.saved_schedule.drop(empty_df['index'])
if temp_df.shape[0] == initial_length - rows_to_remove:
self.saved_schedule = temp_df
else:
raise exceptions.APIError(
'An unexpected number of entries were removed; this indicates an issue with the saved schedule.')
print(f'Removed {rows_to_remove} empties.')
[docs] def remove_schedule_duplicates(self):
"""
Removes all duplicate entries in the saved_schedule
:return:
"""
initial_length = self.saved_schedule.shape[0]
dupe_df = self.get_schedule_duplicates()
# report must be generated before the duplicates are removed
duplicates_report = self.generate_duplicates_report(dupe_df)
if dupe_df.empty:
print('No duplicates to remove.')
return
rows_to_remove = dupe_df.shape[0]
temp_df = self.saved_schedule.drop(dupe_df['dupe_index'])
if temp_df.shape[0] == initial_length - rows_to_remove:
self.saved_schedule = temp_df
else:
raise exceptions.APIError(
'An unexpected number of entries were removed; this indicates an issue with the saved schedule.')
print(f'Removed {rows_to_remove} duplicates.')
[docs] def remove_schedule_conflicts(self):
"""
Removes all conflicting entries in the saved_schedule
:return:
"""
initial_length = self.saved_schedule.shape[0]
conflict_df = self.get_schedule_conflicts()
# report must be generated before the duplicates are removed
conflicts_report = self.generate_conflicts_report(conflict_df)
if conflict_df.empty:
print('No duplicates to remove.')
return
rows_to_remove = 2 * conflict_df.shape[0]
temp_df = self.saved_schedule.drop(conflict_df['conflict_index'])
temp_df = temp_df.drop(conflict_df.reset_index()['index'])
if temp_df.shape[0] == initial_length - rows_to_remove:
self.saved_schedule = temp_df
else:
raise exceptions.APIError(
'An unexpected number of entries were removed; this indicates an issue with the saved schedule.')
print(f'Removed {rows_to_remove} conflicts.')
[docs]class ProviderReport(ProviderConnection):
def __init__(self, file, *args, **kwargs):
# TODO: isinstance
if file.__class__.__name__ == pandas.DataFrame().__class__.__name__:
self.df = file.copy()
elif file.upper().endswith('.CSV'):
self.df = pandas.read_csv(file)
else:
self.df = pandas.read_excel(file)
super(ProviderReport, self).__init__(*args, **kwargs)
[docs] def add_to_report(self, *args, key_column="provider_id"):
"""
Adds the specified provider information to an excel or csv report according to NPI (emp_id)
:param args: (list) of provider fields to be retrieved from tangier and added to the report
:param key_column: (str) indicates the header name of the column that contains npis or emp_ids on the report
:return: None
"""
clean_ids = lambda x: int(float(x)) if not re.findall('[a-zA-Z]', f'{x}') else 0
self.df[key_column] = self.df[key_column].apply(clean_ids)
self.df[key_column] = self.df[key_column].astype(str)
provider_ids = list(self.df[key_column].unique())
info_list = self.provider_info_values_list(provider_ids=provider_ids)
get_if_in_keys = lambda x, key: x[key] if key in x.keys() else ''
columns_to_add = {arg: f'provider_{arg}' for arg in args}
for column in columns_to_add.values():
self.df[column] = ''
original_index_name = self.df.index.name
self.df = self.df.reset_index()
for index, row in self.df.iterrows():
provider_info = [*filter(lambda x: x.get("emp_id") == row[key_column], info_list)]
if provider_info:
for dict_key, df_column in columns_to_add.items():
self.df.loc[index, f'{df_column}'] = get_if_in_keys(provider_info[0], dict_key)
columns = list(self.df.columns.values)
reordered_columns = [key_column, *columns_to_add.values()]
for col in reordered_columns:
columns.remove(col)
reordered_columns.extend(columns)
self.df = self.df[[*reordered_columns]]
self.df = self.df.set_index("index" if not original_index_name else original_index_name)
[docs]class ScheduleWithData:
def __init__(self, schedule_connection, provider_connection, location_connection):
try:
import pandas
except:
raise ImportError(f'{self.__name__} requires pandas to be importable in your environment.')
if not isinstance(schedule_connection, ScheduleConnection):
raise exceptions.APIError('schedule_connection argument (arg[0]) must be a ScheduleConnection instance.')
if not isinstance(provider_connection, ProviderConnection):
raise exceptions.APIError('provider_connection argument (arg[1]) must be a ProviderConnection instance.')
if not isinstance(location_connection, LocationConnection):
raise exceptions.APIError('location_connection argument (arg[0]) must be a LocationConnection instance.')
self.sconn = schedule_connection
self.pconn = provider_connection
self.lconn = location_connection
def _get_provider_info(self):
self.providers = pandas.DataFrame(self.pconn.provider_info_values_list(all_providers=True,
use_primary_keys=True)).fillna('')
def _get_location_info(self):
self.locations = pandas.DataFrame(self.lconn.location_info_values_list(site_ids='ALL_SITE_IDS')).fillna('')
[docs] def save_schedule_from_range(self, start_date, end_date):
self._get_provider_info()
self._get_location_info()
self.sconn.save_schedule_from_range(start_date, end_date,
site_ids=list(self.locations['site_id'].unique()),
include_provider_primary_key='true')
self.saved_schedule = self.sconn.saved_schedule
self.temp_locations = self.locations.drop(columns=['@action', 'is_scheduled']) \
.rename(columns={'name': 'site_name', 'short_name': 'site_short_name'})
self.temp_providers = self.providers.drop(
columns=['@action', 'processed', 'comment', 'street', 'city', 'state', 'zip'])
with_sites = self.saved_schedule.merge(self.temp_locations, how='left', left_on=['siteid'],
right_on=['site_id']).drop(columns=['location'])
with_all = with_sites.merge(self.temp_providers, how='left', left_on=['providerprimarykey'],
right_on=['provider_primary_key'])
with_all = with_all.drop(columns=['empid', 'siteid', 'providerprimarykey'])
self.saved_schedule = with_all.fillna('')
self.sconn.saved_schedule = self.saved_schedule
[docs]class ProviderLocations:
def __init__(self, pconn, lconn):
self.pconn = pconn
self.lconn = lconn
self.all_locations = lconn.location_info_values_list()
self.all_providers = pconn.provider_info_values_list(all_providers=True)
self.all_location_provider_values = []
@property
def all_location_provider_values(self):
"""
we want to go get them if an access is attempted and we haven't gotten them already
"""
if not self.__all_location_provider_values:
self.all_location_provider_values = self._get_all_location_provider_values()
return self.__all_location_provider_values
@all_location_provider_values.setter
def all_location_provider_values(self, val):
self.__all_location_provider_values = [*val]
def _get_all_location_provider_values(self):
values_list, current_line = [], ''
for location in self.all_locations:
values_list.extend(self.location_provider_values(location['site_id']))
current_line = self._print_stream(location['site_id'], current_line)
self.all_location_provider_values = [*values_list]
return values_list
def _print_stream(self, current_item, current_line):
new_line = f'{current_line + " " if current_line else ""}{current_item}'
if len(new_line) > 79:
new_line = f'{current_item} '
sys.stdout.write('\n')
sys.stdout.write(new_line)
else:
sys.stdout.write(f'{current_item} ')
sys.stdout.flush()
return new_line
[docs] def location_provider_info(self, site_id):
"""
Sends a provider info request info for all provider_ids for one site_id
:param site_id_in: (str) site_id to get provider info for
:return: xml with a provider info response
"""
xml_string = self.pconn.base_xml
xml_string = xmlmanip.inject_tags(xml_string, injection_index=2, providers="")
provider_dict = {
'provider': {
"action": "info", "__inner_tag": {
"site_id": site_id,
"provider_primary_key": "ALL",
}
}
}
xml_string = xmlmanip.inject_tags(xml_string, parent_tag="providers", **provider_dict)
return self.pconn.MaintainProviders(xml_string).encode('utf-8')
[docs] def location_provider_values(self, site_id):
location_provider_info_response = self.location_provider_info(site_id)
location_provider_info_schema = xmlmanip.XMLSchema(location_provider_info_response)
location_provider_values = location_provider_info_schema.search(site_id__ne='')
return location_provider_values
[docs] def join_all_locations_with_all_providers(self):
normalized_provider_location_values = self.all_location_provider_values
normalized_provider_location_values_df = pandas.DataFrame(normalized_provider_location_values)
provider_info_df = pandas.DataFrame(self.all_providers)
joined_df = normalized_provider_location_values_df.merge(provider_info_df, how='inner',
left_on=['provider_primary_key', 'emp_id'],
right_on=['provider_primary_key', 'emp_id'])
return joined_df