Code reorganization for better session management in Moses server. Clients can now close sessions.

This commit is contained in:
Ulrich Germann 2015-08-04 01:59:28 +01:00
parent c5b193346c
commit fc10ad4afb
10 changed files with 209 additions and 66 deletions

View File

@ -63,10 +63,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#include <xmlrpc-c/base.hpp>
#include <xmlrpc-c/registry.hpp>
#include <xmlrpc-c/server_abyss.hpp>
#include "server/Translator.h"
#include "server/Optimizer.h"
#include "server/Updater.h"
#include "moses/parameters/ServerOptions.h"
#include "server/Server.h"
#endif
using namespace std;
@ -147,44 +144,8 @@ Parameter params;
int
run_as_server()
{
#ifdef HAVE_XMLRPC_C
ServerOptions sopts(params);
if (sopts.is_serial) VERBOSE(1,"Running server in serial mode." << endl);
xmlrpc_c::registry myRegistry;
xmlrpc_c::methodPtr const
translator(new MosesServer::Translator(sopts)),
updater(new MosesServer::Updater),
optimizer(new MosesServer::Optimizer);
myRegistry.addMethod("translate", translator);
myRegistry.addMethod("updater", updater);
myRegistry.addMethod("optimize", optimizer);
xmlrpc_c::serverAbyss myAbyssServer(
xmlrpc_c::serverAbyss::constrOpt()
.registryP(&myRegistry)
.portNumber(sopts.port) // TCP port on which to listen
.logFileName(sopts.logfile)
.allowOrigin("*")
.maxConn(sopts.num_threads)
);
XVERBOSE(1,"Listening on port " << sopts.port << endl);
if (sopts.is_serial)
{
while(true) myAbyssServer.runOnce();
}
else myAbyssServer.run();
std::cerr << "xmlrpc_c::serverAbyss.run() returned but should not." << std::endl;
// #pragma message("BUILDING MOSES WITH SERVER SUPPORT")
#else
// #pragma message("BUILDING MOSES WITHOUT SERVER SUPPORT")
std::cerr << "Moses was compiled without server support." << endl;
#endif
return 1;
MosesServer::Server server(params);
return server.run(); // actually: don't return. see Server::run()
}
int

View File

@ -71,7 +71,7 @@ namespace Moses
#ifdef IFVERBOSE
#undef IFVERBOSE
#endif
#define IFVERBOSE(level) if (StaticData::Instance().GetVerboseLevel() >= level)
#define IFVERBOSE(level) if (Moses::StaticData::Instance().GetVerboseLevel() >= level)
#define XVERBOSE(level,str) VERBOSE(level, "[" << HERE << "] " << str)
#define HERE __FILE__ << ":" << __LINE__
#define FEATUREVERBOSE(level,str) FEATUREVERBOSE2(level, "[" << GetScoreProducerDescription() << "] " << str)

View File

@ -0,0 +1,30 @@
// -*- mode: c++; indent-tabs-mode: nil; tab-width: -*-
#include "CloseSession.h"
#include "Server.h"
#include "moses/StaticData.h"
namespace MosesServer
{
CloseSession::
CloseSession(Server& server)
: m_server(server)
{ }
void
CloseSession::
execute(xmlrpc_c::paramList const& paramList,
xmlrpc_c::value * const retvalP)
{
typedef std::map<std::string, xmlrpc_c::value> params_t;
paramList.verifyEnd(1); // ??? UG
params_t const& params = paramList.getStruct(0);
params_t::const_iterator si = params.find("session-id");
if (si != params.end())
{
uint64_t session_id = xmlrpc_c::value_int(si->second);
m_server.delete_session(session_id);
*retvalP = xmlrpc_c::value_string("Session closed");
}
}
}

View File

@ -0,0 +1,24 @@
// -*- mode: c++; indent-tabs-mode: nil; tab-width: -*-
#pragma once
#include <xmlrpc-c/base.hpp>
#include <xmlrpc-c/registry.hpp>
#include <xmlrpc-c/server_abyss.hpp>
#ifndef WITH_THREADS
#pragma message("COMPILING WITHOUT THREADS!")
#endif
namespace MosesServer
{
class Server;
class
CloseSession : public xmlrpc_c::method
{
Server& m_server;
public:
CloseSession(Server& server);
void execute(xmlrpc_c::paramList const& paramList,
xmlrpc_c::value * const retvalP);
};
}

72
moses/server/Server.cpp Normal file
View File

@ -0,0 +1,72 @@
// -*- mode: c++; indent-tabs-mode: nil; tab-width: 2 -*-
#include "Server.h"
namespace MosesServer
{
Server::
Server(Moses::Parameter& params)
: m_server_options(params),
m_updater(new Updater),
m_optimizer(new Optimizer),
m_translator(new Translator(*this)),
m_close_session(new CloseSession(*this))
{
m_registry.addMethod("translate", m_translator);
m_registry.addMethod("updater", m_updater);
m_registry.addMethod("optimize", m_optimizer);
m_registry.addMethod("close_session", m_close_session);
}
int
Server::
run()
{
#ifdef HAVE_XMLRPC_C
xmlrpc_c::serverAbyss myAbyssServer
(xmlrpc_c::serverAbyss::constrOpt()
.registryP(&m_registry)
.portNumber(m_server_options.port) // TCP port on which to listen
.logFileName(m_server_options.logfile)
.allowOrigin("*")
.maxConn(m_server_options.num_threads));
XVERBOSE(1,"Listening on port " << m_server_options.port << endl);
if (m_server_options.is_serial)
{
VERBOSE(1,"Running server in serial mode." << endl);
while(true) myAbyssServer.runOnce();
}
else myAbyssServer.run();
std::cerr << "xmlrpc_c::serverAbyss.run() returned but should not." << std::endl;
// #pragma message("BUILDING MOSES WITH SERVER SUPPORT")
#else
// #pragma message("BUILDING MOSES WITHOUT SERVER SUPPORT")
std::cerr << "Moses was compiled without server support." << endl;
#endif
return 1;
}
Moses::ServerOptions const&
Server::
options() const
{
return m_server_options;
}
Session const&
Server::
get_session(uint64_t session_id)
{
return m_session_cache[session_id];
}
void
Server::
delete_session(uint64_t const session_id)
{
return m_session_cache.erase(session_id);
}
}

40
moses/server/Server.h Normal file
View File

@ -0,0 +1,40 @@
// -*- mode: c++; indent-tabs-mode: nil; tab-width: 2 -*-
#pragma once
#ifdef HAVE_XMLRPC_C
#include <xmlrpc-c/base.hpp>
#include <xmlrpc-c/registry.hpp>
#include <xmlrpc-c/server_abyss.hpp>
#include "Translator.h"
#include "Optimizer.h"
#include "Updater.h"
#include "CloseSession.h"
#include "Session.h"
#include "moses/parameters/ServerOptions.h"
#endif
namespace MosesServer
{
class Server
{
Moses::ServerOptions m_server_options;
SessionCache m_session_cache;
xmlrpc_c::registry m_registry;
xmlrpc_c::methodPtr const m_updater;
xmlrpc_c::methodPtr const m_optimizer;
xmlrpc_c::methodPtr const m_translator;
xmlrpc_c::methodPtr const m_close_session;
public:
Server(Moses::Parameter& params);
int run();
void delete_session(uint64_t const session_id);
Moses::ServerOptions const&
options() const;
Session const&
get_session(uint64_t session_id);
};
}

View File

@ -4,6 +4,7 @@
#include "moses/ContextScope.h"
#include <sys/time.h>
#include <boost/unordered_map.hpp>
#ifdef WITH_THREADS
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/locks.hpp>
@ -17,6 +18,8 @@ namespace MosesServer{
time_t last_access;
boost::shared_ptr<Moses::ContextScope> const scope; // stores local info
Session(uint64_t const session_id)
: id(session_id), scope(new Moses::ContextScope)
{
@ -53,6 +56,15 @@ namespace MosesServer{
std::pair<uint64_t, Session> foo(id, Session(id));
return m_cache.insert(foo).first->second;
}
void
erase(uint32_t const id)
{
boost::unique_lock<boost::shared_mutex> lock(m_lock);
m_cache.erase(id);
}
};

View File

@ -39,7 +39,8 @@ void
TranslationRequest::
Run()
{
parse_request(m_paramList.getStruct(0));
std::map<std::string,xmlrpc_c::value>const& params = m_paramList.getStruct(0);
parse_request(params);
// cerr << "SESSION ID" << ret->m_session_id << endl;
if (m_session_id)
{
@ -253,7 +254,7 @@ parse_request(std::map<std::string, xmlrpc_c::value> const& params)
m_source_string = xmlrpc_c::value_string(si->second);
XVERBOSE(1,"Input: " << m_source_string << endl);
si = params.find("session_id");
si = params.find("session-id");
if (si != params.end())
m_session_id = xmlrpc_c::value_int(si->second);
else
@ -386,7 +387,7 @@ run_phrase_decoder()
pack_hypothesis(manager.GetBestHypothesis(), "text", m_retData);
if (m_session_id)
m_retData["session_id"] = xmlrpc_c::value_int(m_session_id);
m_retData["session-id"] = xmlrpc_c::value_int(m_session_id);
if (m_withGraphInfo) insertGraphInfo(manager,m_retData);
if (m_withTopts) insertTranslationOptions(manager,m_retData);

View File

@ -1,5 +1,6 @@
#include "Translator.h"
#include "TranslationRequest.h"
#include "Server.h"
namespace MosesServer
{
@ -8,9 +9,9 @@ using namespace std;
using namespace Moses;
Translator::
Translator(Moses::ServerOptions const& sopts)
: m_threadPool(sopts.num_threads),
m_server_options(sopts)
Translator(Server& server)
: m_threadPool(server.options().num_threads),
m_server(server)
{
// signature and help strings are documentation -- the client
// can query this information with a system.methodSignature and
@ -39,8 +40,7 @@ Session const&
Translator::
get_session(uint64_t const id)
{
return m_session_cache[id];
return m_server.get_session(id);
}
}

View File

@ -3,7 +3,7 @@
#include "moses/ThreadPool.h"
#include "moses/parameters/ServerOptions.h"
#include "session.h"
#include "Session.h"
#include <xmlrpc-c/base.hpp>
#include <xmlrpc-c/registry.hpp>
#include <xmlrpc-c/server_abyss.hpp>
@ -12,20 +12,23 @@
#endif
namespace MosesServer
{
class
Translator : public xmlrpc_c::method
{
Moses::ServerOptions m_server_options;
public:
Translator(Moses::ServerOptions const& sopts);
void execute(xmlrpc_c::paramList const& paramList,
xmlrpc_c::value * const retvalP);
Session const& get_session(uint64_t session_id);
private:
Moses::ThreadPool m_threadPool;
SessionCache m_session_cache;
};
class Server;
class
Translator : public xmlrpc_c::method
{
Server& m_server;
// Moses::ServerOptions m_server_options;
public:
Translator(Server& server);
void execute(xmlrpc_c::paramList const& paramList,
xmlrpc_c::value * const retvalP);
Session const& get_session(uint64_t session_id);
private:
Moses::ThreadPool m_threadPool;
};
}