OregonCore  revision fb2a440-git
Your Favourite TBC server
WorldSocketMgr.cpp
Go to the documentation of this file.
1 /*
2  * This file is part of the OregonCore Project. See AUTHORS file for Copyright information
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License as published by the
6  * Free Software Foundation; either version 2 of the License, or (at your
7  * option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12  * more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program. If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 #include "WorldSocketMgr.h"
19 
20 #include <ace/ACE.h>
21 #include <ace/Log_Msg.h>
22 #include <ace/Reactor.h>
23 #include <ace/Reactor_Impl.h>
24 #include <ace/TP_Reactor.h>
25 #include <ace/Dev_Poll_Reactor.h>
26 #include <ace/Guard_T.h>
27 #include <ace/Atomic_Op.h>
28 #include <ace/os_include/arpa/os_inet.h>
29 #include <ace/os_include/netinet/os_tcp.h>
30 #include <ace/os_include/sys/os_types.h>
31 #include <ace/os_include/sys/os_socket.h>
32 
33 #include <set>
34 
35 #include "Log.h"
36 #include "Common.h"
37 #include "Config/Config.h"
38 #include "Database/DatabaseEnv.h"
39 #include "WorldSocket.h"
40 
46 class ReactorRunnable : protected ACE_Task_Base
47 {
48  public:
49 
51  m_Reactor(0),
52  m_Connections(0),
53  m_ThreadId(-1)
54  {
55  ACE_Reactor_Impl* imp = 0;
56 
57  #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
58 
59  imp = new ACE_Dev_Poll_Reactor();
60 
61  imp->max_notify_iterations (128);
62  imp->restart (1);
63 
64  #else
65 
66  imp = new ACE_TP_Reactor();
67  imp->max_notify_iterations (128);
68 
69  #endif
70 
71  m_Reactor = new ACE_Reactor (imp, 1);
72  }
73 
74  virtual ~ReactorRunnable()
75  {
76  Stop();
77  Wait();
78 
79  delete m_Reactor;
80  }
81 
82  void Stop()
83  {
84  m_Reactor->end_reactor_event_loop();
85  }
86 
87  int Start()
88  {
89  if (m_ThreadId != -1)
90  return -1;
91 
92  return (m_ThreadId = activate());
93  }
94 
95  void Wait()
96  {
97  ACE_Task_Base::wait();
98  }
99 
100  long Connections()
101  {
102  return static_cast<long> (m_Connections.value());
103  }
104 
105  int AddSocket (WorldSocket* sock)
106  {
107  ACE_GUARD_RETURN (ACE_Thread_Mutex, Guard, m_NewSockets_Lock, -1);
108 
109  ++m_Connections;
110  sock->AddReference();
111  sock->reactor (m_Reactor);
112  m_NewSockets.insert (sock);
113 
114  return 0;
115  }
116 
117  ACE_Reactor* GetReactor()
118  {
119  return m_Reactor;
120  }
121 
122  protected:
123 
125  {
126  ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock);
127 
128  if (m_NewSockets.empty())
129  return;
130 
131  for (SocketSet::iterator i = m_NewSockets.begin(); i != m_NewSockets.end(); ++i)
132  {
133  WorldSocket* sock = (*i);
134 
135  if (sock->IsClosed())
136  {
137  sock->RemoveReference();
138  --m_Connections;
139  }
140  else
141  m_Sockets.insert (sock);
142  }
143 
144  m_NewSockets.clear();
145  }
146 
147  virtual int svc()
148  {
149  DEBUG_LOG ("Network Thread Starting");
150 
152 
153  ACE_ASSERT (m_Reactor);
154 
155  SocketSet::iterator i, t;
156 
157  while (!m_Reactor->reactor_event_loop_done())
158  {
159  // dont be too smart to move this outside the loop
160  // the run_reactor_event_loop will modify interval
161  ACE_Time_Value interval (0, 100000);
162 
163  if (m_Reactor->run_reactor_event_loop (interval) == -1)
164  break;
165 
166  AddNewSockets();
167 
168  for (i = m_Sockets.begin(); i != m_Sockets.end();)
169  {
170  if ((*i)->Update() == -1)
171  {
172  t = i;
173  ++i;
174  (*t)->CloseSocket();
175  (*t)->RemoveReference();
176  --m_Connections;
177  m_Sockets.erase (t);
178  }
179  else
180  ++i;
181  }
182  }
183 
185 
186  DEBUG_LOG ("Network Thread Exitting");
187 
188  return 0;
189  }
190 
191  private:
192  typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> AtomicInt;
193  typedef std::set<WorldSocket*> SocketSet;
194 
195  ACE_Reactor* m_Reactor;
196  AtomicInt m_Connections;
198 
199  SocketSet m_Sockets;
200 
201  SocketSet m_NewSockets;
202  ACE_Thread_Mutex m_NewSockets_Lock;
203 };
204 
206  m_NetThreads(0),
207  m_NetThreadsCount(0),
208  m_SockOutKBuff(-1),
209  m_SockOutUBuff(65536),
210  m_UseNoDelay(true),
211  m_Acceptor (0)
212 {
213 }
214 
216 {
217  delete[] m_NetThreads;
218  delete m_Acceptor;
219 }
220 
221 int
222 WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
223 {
224  m_UseNoDelay = sConfig.GetBoolDefault ("Network.TcpNodelay", true);
225 
226  int num_threads = sConfig.GetIntDefault ("Network.Threads", 1);
227 
228  if (num_threads <= 0)
229  {
230  sLog.outError ("Network.Threads is wrong in your config file");
231  return -1;
232  }
233 
234  m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
235 
237 
238  sLog.outBasic ("Max allowed socket connections %d", ACE::max_handles());
239 
240  // -1 means use default
241  m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1);
242 
243  m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
244 
245  if (m_SockOutUBuff <= 0)
246  {
247  sLog.outError ("Network.OutUBuff is wrong in your config file");
248  return -1;
249  }
250 
252  m_Acceptor = acc;
253 
254  ACE_INET_Addr listen_addr (port, address);
255 
256  if (acc->open(listen_addr, m_NetThreads[0].GetReactor(), ACE_NONBLOCK) == -1)
257  {
258  sLog.outError ("Failed to open acceptor ,check if the port is free");
259  return -1;
260  }
261 
262  for (size_t i = 0; i < m_NetThreadsCount; ++i)
263  m_NetThreads[i].Start();
264 
265  return 0;
266 }
267 
268 int
269 WorldSocketMgr::StartNetwork (ACE_UINT16 port, const char* address)
270 {
271  if (!sLog.IsOutDebug())
272  ACE_Log_Msg::instance()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS);
273 
274  if (StartReactiveIO(port, address) == -1)
275  return -1;
276 
277  return 0;
278 }
279 
280 void
282 {
283  if (m_Acceptor)
284  {
286 
287  if (acc)
288  acc->close();
289  }
290 
291  if (m_NetThreadsCount != 0)
292  {
293  for (size_t i = 0; i < m_NetThreadsCount; ++i)
294  m_NetThreads[i].Stop();
295  }
296 
297  Wait();
298 }
299 
300 void
302 {
303  if (m_NetThreadsCount != 0)
304  {
305  for (size_t i = 0; i < m_NetThreadsCount; ++i)
306  m_NetThreads[i].Wait();
307  }
308 }
309 
310 int
312 {
313  // set some options here
314  if (m_SockOutKBuff >= 0)
315  {
316  if (sock->peer().set_option (SOL_SOCKET,
317  SO_SNDBUF,
318  (void*) & m_SockOutKBuff,
319  sizeof (int)) == -1 && errno != ENOTSUP)
320  {
321  sLog.outError ("WorldSocketMgr::OnSocketOpen set_option SO_SNDBUF");
322  return -1;
323  }
324  }
325 
326  static const int ndoption = 1;
327 
328  // Set TCP_NODELAY.
329  if (m_UseNoDelay)
330  {
331  if (sock->peer().set_option (ACE_IPPROTO_TCP,
332  TCP_NODELAY,
333  (void*)&ndoption,
334  sizeof (int)) == -1)
335  {
336  sLog.outError ("WorldSocketMgr::OnSocketOpen: peer().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno));
337  return -1;
338  }
339  }
340 
341  sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff);
342 
343  // we skip the Acceptor Thread
344  size_t min = 1;
345 
346  ACE_ASSERT (m_NetThreadsCount >= 1);
347 
348  for (size_t i = 1; i < m_NetThreadsCount; ++i)
349  if (m_NetThreads[i].Connections() < m_NetThreads[min].Connections())
350  min = i;
351 
352  return m_NetThreads[min].AddSocket (sock);
353 }
354 
357 {
358  return ACE_Singleton<WorldSocketMgr, ACE_Thread_Mutex>::instance();
359 }
360 
long AddReference(void)
#define sConfig
Definition: Config.h:52
int AddSocket(WorldSocket *sock)
DatabaseType WorldDatabase
Accessor to the world database.
Definition: Main.cpp:53
void ThreadEnd()
Definition: Database.cpp:196
ACE_Event_Handler * m_Acceptor
#define sLog
Log class singleton.
Definition: Log.h:187
size_t m_OutBufferSize
Definition: WorldSocket.h:212
ACE_Reactor * GetReactor()
ACE_Thread_Mutex m_NewSockets_Lock
ACE_Reactor * m_Reactor
virtual ~ReactorRunnable()
ReactorRunnable * m_NetThreads
int StartReactiveIO(ACE_UINT16 port, const char *address)
AtomicInt m_Connections
static WorldSocketMgr * Instance()
virtual int svc()
void Guard(void *)
#define DEBUG_LOG(...)
Definition: Log.h:194
void ThreadStart()
Definition: Database.cpp:191
int StartNetwork(ACE_UINT16 port, const char *address)
SocketSet m_NewSockets
ACE_Atomic_Op< ACE_SYNCH_MUTEX, long > AtomicInt
std::set< WorldSocket * > SocketSet
bool IsClosed(void) const
int OnSocketOpen(WorldSocket *sock)
ACE_Acceptor< WorldSocket, ACE_SOCK_ACCEPTOR > Acceptor
Definition: WorldSocket.h:91
virtual ~WorldSocketMgr()
true
Definition: Log.cpp:534
long RemoveReference(void)
size_t m_NetThreadsCount