# -*- coding: utf-8 -*-
# Experimental data manager
# Loads a csv data file and extracts key information into usable structures for analysis
from __future__ import unicode_literals
import logging
logging.debug('Loading data.py')
# Import PyQt5 classes
from .qt import *
from collections import defaultdict
from copy import deepcopy
import operator
import logging
import pandas as pd
import numpy as np
from PIL import Image
[docs]class DataTreeItem(object):
'''
a python object used to return row/column data, and keep note of
it's parents and/or children
'''
def __init__(self, dso, header, parentItem):
self.dso = dso
self.parentItem = parentItem
self.header = header
self.childItems = []
[docs] def appendChild(self, item):
self.childItems.append(item)
[docs] def child(self, row):
return self.childItems[row]
[docs] def childCount(self):
return len(self.childItems)
[docs] def columnCount(self):
return 2
[docs] def data(self, column):
e = set()
for el in self.dso.entities_t:
e |= set(el) # Add entities to the set
map = {
0: 0,
1: self.dso.manager.v.name,
2: self.dso.name,
3: ', '.join(e - {'NoneType'}),
4: 'x'.join([str(s) for s in self.dso.shape]),
}
if self.dso:
return map[column]
return QVariant()
[docs] def icon(self):
if self.dso.manager.v.plugin.workspace_icon:
return self.dso.manager.v.plugin.workspace_icon
[docs] def parent(self):
return self.parentItem
[docs] def row(self):
if self.parentItem:
return self.parentItem.childItems.index(self)
return 0
[docs]class DataTreeModel(QAbstractItemModel):
'''
a model to display a few names, ordered by sex
'''
def __init__(self, dsos=[], parent=None):
super(DataTreeModel, self).__init__(parent)
self.dsos = dsos
self.HORIZONTAL_HEADERS = ['', 'Source', 'Data', 'Entities', 'Size']
self.rootItem = DataTreeItem(None, "ALL", None)
self.parents = {0: self.rootItem}
self.setupModelData()
[docs] def columnCount(self, parent=None):
if parent and parent.isValid():
return parent.internalPointer().columnCount()
else:
return len(self.HORIZONTAL_HEADERS)
[docs] def data(self, index, role):
if not index.isValid():
return QVariant()
item = index.internalPointer()
if role == Qt.DisplayRole:
return item.data(index.column())
if role == Qt.UserRole:
if item:
return item.dso
if role == Qt.DecorationRole and index.column() == 1:
return item.icon()
return QVariant()
[docs] def index(self, row, column, parent):
if not self.hasIndex(row, column, parent):
return QModelIndex()
if not parent.isValid():
parentItem = self.rootItem
else:
parentItem = parent.internalPointer()
childItem = parentItem.child(row)
if childItem:
return self.createIndex(row, column, childItem)
else:
return QModelIndex()
[docs] def parent(self, index):
if not index.isValid():
return QModelIndex()
childItem = index.internalPointer()
if not childItem:
return QModelIndex()
parentItem = childItem.parent()
if parentItem == self.rootItem:
return QModelIndex()
return self.createIndex(parentItem.row(), 0, parentItem)
[docs] def rowCount(self, parent=QModelIndex()):
if parent.column() > 0:
return 0
if not parent.isValid():
p_Item = self.rootItem
else:
p_Item = parent.internalPointer()
return p_Item.childCount()
[docs] def setupModelData(self):
if self.dsos:
for dso in self.dsos:
newItem = DataTreeItem(dso, "", self.rootItem)
self.rootItem.appendChild(newItem)
[docs] def refresh(self):
self.layoutAboutToBeChanged.emit([], QAbstractItemModel.NoLayoutChangeHint)
ins = set()
for n, item in enumerate(self.rootItem.childItems): # self.parentItem.childItems.index(self)
if item.dso not in self.dsos:
self.removeRow(n)
ins.add(item.dso)
outs = set(self.dsos) - ins
for dso in outs:
newItem = DataTreeItem(dso, "", self.rootItem)
self.rootItem.appendChild(newItem)
self.layoutChanged.emit([], QAbstractItemModel.NoLayoutChangeHint)
# DataManager allows a view/analysis class to handle control of consumable data sources
[docs]class DataManager(QObject):
# Signals
source_updated = pyqtSignal(object)
output_updated = pyqtSignal(object)
consumed = pyqtSignal(tuple, tuple)
unconsumed = pyqtSignal(tuple, tuple)
interfaces_changed = pyqtSignal()
def __init__(self, parent, view, *args, **kwargs):
super(DataManager, self).__init__(*args, **kwargs)
self.m = parent
self.v = view
self.id = self.v.id # Data manager id == that of parent (simplicity; one manager per view)
self.consumer_defs = [] # Holds data-consumer definitions
self.consumes = [] # Holds list of data objects that are consumed
self.i = {} # Inputs: dict of 'interface' tuples: (origin,interface)
self.o = {} # Outputs
self.watchers = defaultdict(set) # List of watchers on each output interface
# Get a dataset through input interface id;
# This provides indirect access to a copy of the object (local link in self.i = {})
[docs] def get(self, interface):
if interface in self.i and self.i[interface] is not None:
# Add ourselves to the watcher for this interface
source_manager, source_interface = self.i[interface]
data = source_manager.o[source_interface]
#dso.manager.watchers[ dso.manager_interface ].add( self )
return deepcopy(data)
return None
[docs] def unget(self, interface):
if interface in self.i:
self._unconsume(interface)
self.i[interface] = None
# Output a dataset through output interface id
# Advertise object for consumption; needs to handle notification of all consumers
# independent of the object itself (so can overwrite instead of warping)
[docs] def put(self, interface, dso, update_consumers=True):
if interface in self.o:
self.o[interface] = dso
# Update consumers / refresh views
#self.o[interface].refresh_interfaces()
#self.o[interface].previously_managed_by.append(self)
#self.notify_watchers(interface)
self.output_updated.emit(interface)
return True
return False
[docs] def unput(self, interface):
logging.debug('Unputting data on interface %s' % interface)
# Trigger _unconsume on all watchers
for w in list(self.watchers[interface]):
for i in w.i.keys():
if w.i[i] is not None:
w.unget(i)
self.watchers[interface] = set()
self.o[interface] = None # DataSet(manager=self) # Empty dso (temp; replace with None later?)
# Get a dataset through output interface id;
[docs] def geto(self, interface):
if interface in self.o:
dso = self.o[interface]
return dso
return False
[docs] def add_output(self, interface, dso=None, is_public=True):
self.o[interface] = None
self.interfaces_changed.emit()
[docs] def remove_output(self, interface):
if interface in self.o:
#self.watchers[interface]
del self.o[interface]
self.notify_watchers(interface)
del self.watchers[interface]
self.interfaces_changed.emit()
return True
return False
[docs] def notify_watchers(self, interface):
for manager in self.watchers[interface]:
for dest_interface, mi in manager.i.items():
if mi:
m, i = mi
if m == self:
manager.source_updated.emit(dest_interface)
# Handle consuming of a data object; assignment to internal tables and processing triggers (plus child-triggers if appropriate)
# Build import/hooks for this consumable object (need interface logic here; standardise where things will end up)
[docs] def can_consume(self, source_manager, source_interface, consumer_defs=None, interface=None):
if consumer_defs is None:
consumer_defs = self.consumer_defs
if interface:
consumer_defs = [d for d in consumer_defs if d.target == interface]
# Don't add data from self manager (infinite loop trigger)
if source_manager == self:
return False
for consumer_def in consumer_defs:
if consumer_def.can_consume(source_manager.o[source_interface]):
return True
return False
[docs] def can_consume_which_of(self, molist, consumer_defs=None):
which = []
for source_manager, source_interface in molist:
if self.can_consume(source_manager, source_interface, consumer_defs):
which.append((source_manager, source_interface))
return which
# Check if a manager has a consumable data object
[docs] def has_consumable(self, manager):
for data in manager.provides:
if self.can_consume(data):
return True
return False
def _unconsume(self, interface):
if self.i[interface] is not None:
source_manager, source_interface = self.i[interface]
if self in source_manager.watchers[source_interface]:
source_manager.watchers[source_interface].remove(self)
del self.i[interface]
self.unconsumed.emit((source_manager, source_interface), (self, interface))
# This is an unchecked consume action; for loading mainly
def _consume_action(self, source_manager, source_interface, interface):
# Remove consumed data to update the source watchers
if interface in self.i:
self._unconsume(interface)
self.i[interface] = (source_manager, source_interface) # Store source as a tuple; we re-do the get rather than storing the actual data
source_manager.watchers[source_interface].add(self)
self.consumed.emit((source_manager, source_interface), (self, interface))
return interface
# Check if we can consume some data, then do it
def _consume(self, source_manager, source_interface, consumer_defs=None):
if consumer_defs is None:
consumer_defs = self.consumer_defs
# Check whether this is allowed (checks manager, checks hierarchy (infinite loopage) )
if not self.can_consume(source_manager, source_interface, consumer_defs):
return False
for consumer_def in consumer_defs:
if consumer_def.can_consume(source_manager.o[source_interface]):
# Remove existing data object link (stop watching)
return self._consume_action(source_manager, source_interface, consumer_def.target)
return False
[docs] def consume(self, source_manager, source_interface):
interface = self._consume(source_manager, source_interface)
if interface:
self.source_updated.emit(interface)
return True
return False
[docs] def consume_any_app(self, app_l):
for a in app_l:
# Iterate all outputs for this app's data manager
for o in a.data.o.keys():
interface = self._consume(a.data, o)
if interface:
self.source_updated.emit(interface)
return a.data.o[o]
return False
[docs] def consume_with(self, data, consumer_def):
if self._consume(data, [consumer_def]):
self.source_updated.emit(consumer_def.target)
return True
[docs] def provide(self, target):
self.provides.append(self.o[target])
[docs] def stop_consuming(self, target):
if target in self.i:
del self.i[target]
[docs] def refresh_consumed_data(self):
self.source_updated.emit(None) # Trigger recalculation
[docs] def reset(self):
for i in list(self.i.keys()):
self.unget(i)
for i in list(self.o.keys()):
self.unput(i)
# Provider/Consumer classes define data availability and requirements for a given dataManager object.
# Object can accept input from any Provider that offers it's Consumer requirements; process it; and then provide it downstream
# view it's own Provider class definition.
[docs]def at_least_one_element_in_common(l1, l2):
return len(set(l1) & set(l2)) > 0
[docs]class DataDefinition(object):
cmp_map = {
'<': operator.lt,
'<=': operator.le,
'=': operator.eq,
'!=': operator.ne,
'>': operator.gt,
'>=': operator.ge,
'aloeic': at_least_one_element_in_common,
}
def __init__(self, target, definition={}, title=None, *args, **kwargs):
super(DataDefinition, self).__init__(*args, **kwargs)
# Store consumer/provider description as entity entries from dict
self.target = target # Target interface for imported data - stored under this in dataManager
self.definition = definition
self.title = title if title else target
[docs] def can_consume(self, data):
logging.debug("Checking can consume object on %s" % self.target)
return self.check(data)
[docs] def check(self, o):
return o is not None
[docs] def get_cmp_fn(self, s):
if type(s) == list:
return self.cmp_map['aloeic'], s
s = str(s) # Treat all input as strings
for k, v in list(self.cmp_map.items()):
if k in s:
return v, s.replace(k, '')
return self.cmp_map['='], s
[docs]class NumpyArrayDataDefinition(DataDefinition):
''' Custom matching definition for numpy arrays '''
[docs] def check(self, o):
return self._check_instance(o) and \
self._check_dimensionality(o)
def _check_instance(self, o):
return isinstance(o, np.ndarray)
def _check_dimensionality(self, o):
if 'shape' not in self.definition:
logging.debug(" not checking shape")
return True
shape = o.shape
d = self.definition['shape']
if len(shape) != len(d):
logging.debug(" dimensionality failure %s %s" % (len(shape), len(d)))
return False
for n, cr in enumerate(d):
if cr is None: # No restriction on this definition
logging.debug(' pass')
continue
cmp_fn, crr = self.get_cmp_fn(cr)
if not cmp_fn(shape[n], int(crr)):
logging.debug(" comparison failure %s %s %s" % (shape[n], cmp_fn, crr))
return False
return True
[docs]class PandasDataDefinition(NumpyArrayDataDefinition):
''' Custom matching definition for pandas dataframes '''
[docs] def check(self, o):
return self._check_instance(o) and \
self._check_dimensionality(o) and \
self._check_columns(o) and \
self._check_index(o)
def _check_columns(self, o):
if 'columns' not in self.definition:
logging.debug(" not checking columns")
return True
vl = []
for m in self.definition['columns']:
if not isinstance(m, tuple):
m = (m, )
for i in m:
vl.append(i in o.columns.names)
return all(vl)
def _check_index(self, o):
if 'index' not in self.definition:
logging.debug(" not checking index")
return True
vl = []
for m in self.definition['index']:
if not isinstance(m, tuple):
m = (m, )
for i in m:
vl.append(m in o.columns.names)
return all(vl)
def _check_instance(self, o):
logging.debug(" check instance")
return isinstance(o, pd.DataFrame)
[docs]class ImageDataDefinition(DataDefinition):
''' Custom matching definition for PIL Images '''
[docs] def check(self, o):
return self._check_instance(o)
def _check_instance(self, o):
return isinstance(o, Image.Image)