Fennel: /home/pub/open/dev/fennel/lucidera/colstore/LcsRowScanBaseExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/lucidera/colstore/LcsRowScanBaseExecStream.h" 00024 #include "fennel/exec/ExecStreamBufAccessor.h" 00025 00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanBaseExecStream.cpp#12 $"); 00027 00028 LcsRowScanBaseExecStream::LcsRowScanBaseExecStream() 00029 { 00030 nClusters = 0; 00031 } 00032 00033 void LcsRowScanBaseExecStream::prepare( 00034 LcsRowScanBaseExecStreamParams const &params) 00035 { 00036 ConfluenceExecStream::prepare(params); 00037 00038
00039
00040
00041 00042 nClusters = params.lcsClusterScanDefs.size(); 00043 pClusters.reset(new SharedLcsClusterReader[nClusters]); 00044 00045 uint clusterStart = 0; 00046 uint projCount = 0; 00047 TupleDescriptor allClusterTupleDesc; 00048 TupleProjection newProj, outputProj; 00049 00050 buildOutputProj(outputProj, params); 00051 00052 newProj.resize(outputProj.size()); 00053 00054
00055 TupleDescriptor outputTupleDesc = pOutAccessor->getTupleDesc(); 00056 for (uint i = 0; i < outputProj.size(); i++) { 00057 if (outputProj[i] == LCS_RID_COLUMN_ID) { 00058 newProj[i] = projCount++; 00059 allClusterTupleDesc.push_back(outputTupleDesc[i]); 00060 nonClusterCols.push_back(params.outputProj[i]); 00061 } 00062 } 00063 00064 allSpecial = (nonClusterCols.size() == newProj.size()); 00065 00066 for (uint i = 0; i < nClusters; i++) { 00067 SharedLcsClusterReader &pClu = pClusters[i]; 00068 00069 BTreeExecStreamParams const &bTreeParams = params.lcsClusterScanDefs[i]; 00070 00071 BTreeDescriptor treeDescriptor; 00072 treeDescriptor.segmentAccessor.pSegment = bTreeParams.pSegment; 00073 treeDescriptor.segmentAccessor.pCacheAccessor = 00074 bTreeParams.pCacheAccessor; 00075 treeDescriptor.tupleDescriptor = bTreeParams.tupleDesc; 00076 treeDescriptor.keyProjection = bTreeParams.keyProj; 00077 treeDescriptor.rootPageId = bTreeParams.rootPageId; 00078 treeDescriptor.segmentId = bTreeParams.segmentId; 00079 treeDescriptor.pageOwnerId = bTreeParams.pageOwnerId; 00080 00081 pClu = 00082 SharedLcsClusterReader( 00083 new LcsClusterReader(treeDescriptor, &ridRuns)); 00084 00085
00086
00087 uint clusterEnd = clusterStart + 00088 params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1; 00089 00090
00091
00092
00093 TupleProjection clusterProj; 00094 for (uint j = 0; j < newProj.size(); j++) { 00095 if (outputProj[j] >= clusterStart && 00096 outputProj[j] <= clusterEnd) 00097 { 00098 clusterProj.push_back(outputProj[j] - clusterStart); 00099 newProj[j] = projCount++; 00100 } 00101 } 00102 clusterStart = clusterEnd + 1; 00103 00104
00105
00106
00107
00108 if (allSpecial) { 00109 clusterProj.push_back(0); 00110 } 00111 pClu->initColumnReaders( 00112 params.lcsClusterScanDefs[i].clusterTupleDesc.size(), 00113 clusterProj); 00114 if (allSpecial) { 00115 for (uint j = 0; j < pClu->nColsToRead; j++) { 00116 allClusterTupleDesc.push_back( 00117 params.lcsClusterScanDefs[i]. 00118 clusterTupleDesc[clusterProj[j]]); 00119 } 00120 } 00121 } 00122 00123
00124
00125 00126 for (uint i = 0; i < newProj.size(); i++) { 00127 projDescriptor.push_back(allClusterTupleDesc[newProj[i]]); 00128 } 00129 00130
00131
00132 projMap.resize(newProj.size()); 00133 for (uint i = 0; i < projMap.size(); i++) { 00134 for (uint j = 0; j < newProj.size(); j++) { 00135 if (newProj[j] == i) { 00136 projMap[i] = j; 00137 } 00138 } 00139 } 00140 } 00141 00142 void LcsRowScanBaseExecStream::open(bool restart) 00143 { 00144 ConfluenceExecStream::open(restart); 00145 for (uint i = 0; i < nClusters; i++) { 00146 pClusters[i]->open(); 00147 } 00148 } 00149 00150 void LcsRowScanBaseExecStream::getResourceRequirements( 00151 ExecStreamResourceQuantity &minQuantity, 00152 ExecStreamResourceQuantity &optQuantity) 00153 { 00154 ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity); 00155 00156
00157
00158
00159 minQuantity.nCachePages += (nClusters * 2); 00160 00161 optQuantity = minQuantity; 00162 } 00163 00164 void LcsRowScanBaseExecStream::closeImpl() 00165 { 00166 ConfluenceExecStream::closeImpl(); 00167 for (uint i = 0; i < nClusters; i++) { 00168 pClusters[i]->close(); 00169 } 00170 } 00171 00172 void LcsRowScanBaseExecStream::syncColumns(SharedLcsClusterReader &pScan) 00173 { 00174 for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) { 00175 pScan->clusterCols[iCluCol].sync(); 00176 } 00177 } 00178 00179 bool LcsRowScanBaseExecStream::readColVals( 00180 SharedLcsClusterReader &pScan, 00181 TupleDataWithBuffer &tupleData, 00182 uint colStart) 00183 { 00184 if (allSpecial) { 00185 for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) { 00186
00187
00188 PBuffer curValue = pScan->clusterCols[iCluCol].getCurrentValue(); 00189 uint idx = projMap[colStart + iCluCol]; 00190 00191 attrAccessors[idx].loadValue(tupleData[idx], curValue); 00192 if (pScan->clusterCols[iCluCol].getFilters().hasResidualFilters) { 00193 if (!pScan->clusterCols[iCluCol].applyFilters( 00194 projDescriptor, 00195 tupleData)) 00196 { 00197 return false; 00198 } 00199 } 00200 } 00201 } 00202 return true; 00203 } 00204 00205 void LcsRowScanBaseExecStream::buildOutputProj( 00206 TupleProjection &outputProj, 00207 LcsRowScanBaseExecStreamParams const &params) 00208 { 00209
00210 00211 00212 for (uint i = 0; i < params.outputProj.size(); i++) { 00213 outputProj.push_back(params.outputProj[i]); 00214 } 00215 } 00216 00217 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanBaseExecStream.cpp#12 $"); 00218 00219