SimplTransport.cc Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 00025 00026 00027 00028 00029 00030 00031 00032 00033 00034 00035 #include <stdlib.h> 00036 #include <stdio.h> 00037 #include <string.h> 00038 00039 #include 00040 #include 00041 00042 #include <mps/mps.h> 00043 #include <mps/transport_simpl.h> 00044 00045 extern "C" { 00046 #include <simpl.h> 00047 } 00048 00049 #define MPS_MESSAGE_MAXLEN 8192
00050 00051 namespace MPS { 00052 00053 SimplTransport *SimplTransport::instance = 0; 00054 00055 00056 00057 static string toString(int x) { 00058 char buf[20]; 00059 sprintf(buf, "%d", x); 00060 return buf; 00061 } 00062 00063 00064 00065 class SimplTransport::SimplIOStream: public MPS::InputStream, public MPS::OutputStream { 00066 private: 00067 bool useReply;
00068 FCID other;
00069 char msg[MPS_MESSAGE_MAXLEN];
00070 int pos;
00071 int msglen;
00072 00073 protected: 00074 virtual string referenceTransport() const { return "simpl"; } 00075 00076 public: 00077 00078 00079 00080 00081 00082 00083 00084 SimplIOStream(FCID const &target, bool _u) 00085 : useReply(_u), 00086 other(target), 00087 pos(0), 00088 msglen(0) 00089 {} 00090 00091 00092 00093 00094 00095 SimplIOStream() 00096 : useReply(true), 00097 pos(0) 00098 { 00099 other = * (FCID *) SimplTransport::popMessage(msg, msglen); 00100 00101 if (other.slot == -1) 00102 throw MPS::MPSException("SIMPL Receive failed"); 00103 } 00104 00105 00106 FCID getOther() const { return other; } 00107 00108 00109 virtual int read() { 00110 if (pos >= msglen) { 00111 throw MPS::MPSConnectionClosedException("reading from a SIMPL message"); 00112 } 00113 00114 return msg[pos++]; 00115 } 00116 00117 00118 virtual void flush() { 00119 if (useReply) { 00120 if (Reply(&other, (void *) getBody(), getLength()) == -1) 00121 throw MPS::MPSException("SIMPL Reply failed"); 00122 } else { 00123 int replylen = Send(&other, const_cast<char *>(getBody()), msg, getLength(), sizeof(msg)); 00124 00125 if (replylen == -1) 00126 throw MPS::MPSException("SIMPL Send failed"); 00127 if (replylen > 0) 00128 SimplTransport::pushMessage(&other, msg, replylen); 00129 } 00130 00131 MPS::OutputStream::flush(); 00132 } 00133 }; 00134 00135 00136 00137 class SimplTransport::SimplConnection: public MPS::Connection { 00138 private: 00139 int oid;
00140 FCID other;
00141 00142 public: 00143 00144 00145 00146 00147 00148 00149 00150 SimplConnection(int _oid, FCID const &_other, MPS::Address const &a) 00151 : MPS::Connection(a), 00152 oid(_oid), 00153 other(_other) 00154 {} 00155 00156 00157 00158 00159 virtual MPS::OutputStream *getOutputStream() { 00160 MPS::OutputStream *stream = new SimplIOStream(other, false); 00161 stream->writeint(oid); 00162 return stream; 00163 } 00164 00165 00166 00167 00168 virtual MPS::InputStream *getInputStream() { 00169 MPS::InputStream *stream = new SimplIOStream(); 00170 int inputOid = stream->readint(); 00171 00172 if (inputOid != oid) { 00173 throw MPS::MPSException("OID mismatch in SimplTransport::SimplConnection, got " + 00174 toString(inputOid) + " expected " + toString(oid)); 00175 } 00176 00177 return stream; 00178 } 00179 00180 virtual void releaseOutputStream(MPS::OutputStream *stream) { delete stream; } 00181 virtual void releaseInputStream(MPS::InputStream *stream) { delete stream; } 00182 }; 00183 00184 00185 00186 SimplTransport::SimplTransport(string const &_simplName) 00187 : Transport("simpl"), 00188 nextOid(1), 00189 simplName(_simplName), 00190 serverMap() 00191 { 00192 if (name_attach(const_cast<char *>(simplName.c_str()), MPS_MESSAGE_MAXLEN, 0) == -1) 00193 throw MPSException("Could not attach to SIMPL name " + simplName); 00194 } 00195 00196 00197 00198 ref SimplTransport::connectTo(Address const &connectionSpec) 00199 { 00200 if (connectionSpec.getParamCount() != 2) 00201 throw MPS::MPSException("Simpl transport requires 2 Address parameters"); 00202 00203 string remoteName = connectionSpec.getParam(0); 00204 int oid = atoi(connectionSpec.getParam(1).c_str()); 00205 00206 FCID other = name_locate(const_cast<char *>(remoteName.c_str())); 00207 if (other.slot == -1) 00208 throw MPS::MPSException("Simpl name_locate of " + remoteName + " failed"); 00209 00210 return new SimplConnection(oid, other, connectionSpec); 00211 } 00212 00213 string SimplTransport::registerServer(Server *server, Address const &spec) 00214 { 00215 if (spec.getParamCount() > 0) { 00216 throw MPS::MPSException("Simpl transport does not allow self-selection of simplName or OID"); 00217 } 00218 00219 int thisOid = nextOid++; 00220 00221 serverMap[thisOid] = server; 00222 00223 Address newSpec("mps:simpl"); 00224 newSpec.setParam(0, instance->simplName); 00225 newSpec.setParam(1, toString(thisOid)); 00226 00227 return newSpec.getResolvedName(); 00228 } 00229 00230 string SimplTransport::deregisterServer(Server *server, Address const &spec) { 00231
00232 throw MPSException("SimplTransport::deregisterServer not yet implemented"); 00233 } 00234 00235 00236 00237 struct QueuedMessage_t { 00238 FCID sender; 00239 char *msg; 00240 int msglen; 00241 00242 QueuedMessage_t() 00243 : msg(0), 00244 msglen(0) 00245 {} 00246 00247 QueuedMessage_t(FCID const &s, char const *m, int len) 00248 : sender(s), 00249 msg(new char[len]), 00250 msglen(len) 00251 { 00252 memcpy(msg, m, len); 00253 } 00254 00255 ~QueuedMessage_t() { 00256 clean(); 00257 } 00258 00259 private: 00260 QueuedMessage_t(QueuedMessage_t const &other); 00261 QueuedMessage_t const &operator=(QueuedMessage_t const &other); 00262 00263 void clean() { 00264 if (msg) { 00265 delete [] msg; 00266 msg = 0; 00267 } 00268 } 00269 }; 00270 00271 static deque<QueuedMessage_t *> msgQueue; 00272 00273 void SimplTransport::pushMessage(void *sender, char const msg, int msglen) { 00274 msgQueue.push_back(new QueuedMessage_t( (FCID *) sender, msg, msglen)); 00275 } 00276 00277 void *SimplTransport::popMessage(char *msg, int &msglen) { 00278 static FCID sender; 00279 00280 if (msgQueue.empty()) { 00281 sender = Receive(msg, &msglen); 00282 } else { 00283 QueuedMessage_t *qm = msgQueue.front(); 00284 msgQueue.pop_front(); 00285 00286 msglen = qm->msglen; 00287 memcpy(msg, qm->msg, msglen); 00288 sender = qm->sender; 00289 00290 delete qm; 00291 } 00292 00293 return &sender; 00294 } 00295 00296 void SimplTransport::mainloop() { 00297 while (1) { 00298 SimplIOStream input; 00299 int oid = input.readint(); 00300 00301 serverMap_t::iterator i = instance->serverMap.find(oid); 00302 if (i == instance->serverMap.end()) { 00303 throw MPSException("Unknown OID " + toString(oid) + " received"); 00304 } 00305 00306 Server *server = (*i).second; 00307 FCID sender = input.getOther(); 00308 00309 SimplIOStream output(sender, true); 00310 output.writeint(oid); 00311 int pos = output.getLength(); 00312 00313 server->dispatch(input, output); 00314 00315 if (output.getLength() > pos) { 00316 output.flush(); 00317 } else { 00318 if (Reply(&sender, 0, 0) == -1) 00319 throw MPS::MPSException("SIMPL Reply failed in mainloop"); 00320 } 00321 } 00322 } 00323 00324 }