Fennel: /home/pub/open/dev/fennel/lucidera/colstore/LcsClusterReplaceExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/lucidera/colstore/LcsClusterReplaceExecStream.h" 00024 #include "fennel/lucidera/colstore/LcsClusterReader.h" 00025 #include "fennel/segment/SegmentFactory.h" 00026 #include "fennel/btree/BTreeBuilder.h" 00027 #include "fennel/exec/ExecStreamBufAccessor.h" 00028 00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReplaceExecStream.cpp#6 $"); 00030 00031 void LcsClusterReplaceExecStream::prepare( 00032 LcsClusterReplaceExecStreamParams const &params) 00033 { 00034 LcsClusterAppendExecStream::prepare(params); 00035 newClusterRootParamId = params.rootPageIdParamId; 00036 00037
00038
00039 origRootPageId = treeDescriptor.rootPageId; 00040 } 00041 00042 void LcsClusterReplaceExecStream::initTupleLoadParams( 00043 const TupleProjection &inputProj) 00044 { 00045 numColumns = inputProj.size() - 1; 00046 00047 projInputTupleDesc.projectFrom(tableColsTupleDesc, inputProj); 00048 projInputTupleData.compute(projInputTupleDesc); 00049 00050
00051
00052
00053
00054
00055
00056
00057 pOrigClusterReader = 00058 SharedLcsClusterReader(new LcsClusterReader(treeDescriptor)); 00059 TupleProjection proj; 00060 proj.resize(numColumns); 00061 for (uint i = 0; i < numColumns; i++) { 00062 proj[i] = i; 00063 } 00064 pOrigClusterReader->initColumnReaders(numColumns, proj); 00065 00066
00067
00068 std::copy(inputProj.begin() + 1, inputProj.end(), proj.begin()); 00069 TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor(); 00070 clusterColsTupleAccessor.bind(inputAccessor, proj); 00071 clusterColsTupleDesc.projectFrom(pInAccessor->getTupleDesc(), proj); 00072 clusterColsTupleData.compute(clusterColsTupleDesc); 00073 00074 attrAccessors.resize(clusterColsTupleDesc.size()); 00075 for (uint i = 0; i < clusterColsTupleDesc.size(); i++) { 00076 attrAccessors[i].compute(clusterColsTupleDesc[i]); 00077 } 00078 00079 origClusterTupleData.computeAndAllocate(clusterColsTupleDesc); 00080 00081
00082 colTupleDesc.reset(new TupleDescriptor[numColumns]); 00083 for (int i = 0; i < numColumns; i++) { 00084
00085 colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i + 1]]); 00086 } 00087 } 00088 00089 void LcsClusterReplaceExecStream::getResourceRequirements( 00090 ExecStreamResourceQuantity &minQuantity, 00091 ExecStreamResourceQuantity &optQuantity) 00092 { 00093 LcsClusterAppendExecStream::getResourceRequirements( 00094 minQuantity, 00095 optQuantity); 00096 00097
00098
00099
00100 minQuantity.nCachePages += 2; 00101 00102 optQuantity = minQuantity; 00103 } 00104 00105 void LcsClusterReplaceExecStream::open(bool restart) 00106 { 00107 newData = false; 00108 00109
00110
00111 LcsClusterAppendExecStream::open(restart); 00112 00113
00114 origNumRows = pOrigClusterReader->getNumRows(); 00115 00116 if (!restart) { 00117
00118
00119 if (opaqueToInt(newClusterRootParamId) > 0) { 00120 pDynamicParamManager->createParam( 00121 newClusterRootParamId, 00122 pInAccessor->getTupleDesc()[0]); 00123 } 00124 00125
00126
00127 pSnapshotSegment = 00128 SegmentFactory::getSnapshotSegment( 00129 treeDescriptor.segmentAccessor.pSegment); 00130 assert(pSnapshotSegment != NULL); 00131 } 00132 00133 if (opaqueToInt(newClusterRootParamId) > 0) { 00134 TupleDatum rootPageIdDatum; 00135 rootPageIdDatum.pData = (PConstBuffer) &(treeDescriptor.rootPageId); 00136 rootPageIdDatum.cbData = sizeof(treeDescriptor.rootPageId); 00137 pDynamicParamManager->writeParam( 00138 newClusterRootParamId, 00139 rootPageIdDatum); 00140 } 00141 00142 pOrigClusterReader->open(); 00143 currLoadRid = LcsRid(0); 00144 currInputRid = LcsRid(MAXU); 00145 needTuple = true; 00146 } 00147 00148 ExecStreamResult LcsClusterReplaceExecStream::getTupleForLoad() 00149 { 00150
00151
00152 if (needTuple) { 00153 return EXECRC_YIELD; 00154 } 00155 00156 if (pInAccessor->getState() == EXECBUF_EOS) { 00157
00158
00159
00160
00161
00162
00163
00164
00165 if (newData) { 00166 return EXECRC_EOS; 00167 } 00168 if (opaqueToInt(currLoadRid) < origNumRows) { 00169 readOrigClusterRow(); 00170 needTuple = false; 00171
00172 initLoad(); 00173 return EXECRC_YIELD; 00174 } else { 00175 pSnapshotSegment->versionPage( 00176 origRootPageId, 00177 treeDescriptor.rootPageId); 00178 return EXECRC_EOS; 00179 } 00180 } 00181 00182 if (pInAccessor->demandData()) { 00183 return EXECRC_BUF_UNDERFLOW; 00184 } 00185 00186
00187
00188 if (newData) { 00189 treeDescriptor.rootPageId = NULL_PAGE_ID; 00190 BTreeBuilder builder( 00191 treeDescriptor, 00192 treeDescriptor.segmentAccessor.pSegment); 00193 builder.createEmptyRoot(); 00194 treeDescriptor.rootPageId = builder.getRootPageId(); 00195 newData = true; 00196 } 00197 00198 initLoad(); 00199 00200 if (currLoadRid == LcsRid(0) || currLoadRid > currInputRid) { 00201 assert(pInAccessor->isTupleConsumptionPending()); 00202 pInAccessor->unmarshalProjectedTuple(projInputTupleData); 00203 currInputRid = 00204 *reinterpret_cast<LcsRid const *> (projInputTupleData[0].pData); 00205 } 00206 00207
00208
00209
00210 if (currInputRid > currLoadRid) { 00211 readOrigClusterRow(); 00212 } else { 00213 assert(currInputRid == currLoadRid); 00214 clusterColsTupleAccessor.unmarshal(clusterColsTupleData); 00215 } 00216 00217 needTuple = false; 00218 return EXECRC_YIELD; 00219 } 00220 00221 void LcsClusterReplaceExecStream::readOrigClusterRow() 00222 { 00223 origClusterTupleData.resetBuffer(); 00224 00225
00226
00227
00228
00229 bool needSync = true; 00230 if (pOrigClusterReader->isPositioned() && 00231 currLoadRid < pOrigClusterReader->getRangeEndRid()) 00232 { 00233 needSync = false; 00234 } 00235 bool rc = pOrigClusterReader->position(currLoadRid); 00236 assert(rc); 00237 for (uint i = 0; i < pOrigClusterReader->nColsToRead; i++) { 00238 if (needSync) { 00239 pOrigClusterReader->clusterCols[i].sync(); 00240 } 00241 PBuffer colValue = pOrigClusterReader->clusterCols[i].getCurrentValue(); 00242 attrAccessors[i].loadValue(origClusterTupleData[i], colValue); 00243 clusterColsTupleData[i] = origClusterTupleData[i]; 00244 } 00245 } 00246 00247 void LcsClusterReplaceExecStream::postProcessTuple() 00248 { 00249
00250
00251 if (currInputRid == currLoadRid) { 00252 LcsClusterAppendExecStream::postProcessTuple(); 00253 } 00254 currLoadRid++; 00255 needTuple = true; 00256 } 00257 00258 void LcsClusterReplaceExecStream::close() 00259 { 00260 LcsClusterAppendExecStream::close(); 00261 pOrigClusterReader->close(); 00262 } 00263 00264 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReplaceExecStream.cpp#6 $"); 00265 00266