fastcgi++  3.1alpha
A C++ FastCGI/Web API
manager.cpp
Go to the documentation of this file.
1 
10 /*******************************************************************************
11 * Copyright (C) 2017 Eddie Carle [eddie@isatec.ca] *
12 * *
13 * This file is part of fastcgi++. *
14 * *
15 * fastcgi++ is free software: you can redistribute it and/or modify it under *
16 * the terms of the GNU Lesser General Public License as published by the Free *
17 * Software Foundation, either version 3 of the License, or (at your option) *
18 * any later version. *
19 * *
20 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT ANY *
21 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS *
22 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for *
23 * more details. *
24 * *
25 * You should have received a copy of the GNU Lesser General Public License *
26 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. *
27 *******************************************************************************/
28 
29 #include "fastcgi++/log.hpp"
30 #include "fastcgi++/manager.hpp"
31 
33 
35  m_transceiver(std::bind(
36  &Fastcgipp::Manager_base::push,
37  this,
38  std::placeholders::_1,
39  std::placeholders::_2)),
40  m_terminate(true),
41  m_stop(true),
42  m_threads(threads)
43 #if FASTCGIPP_LOG_LEVEL > 3
44  ,m_requestCount(0),
45  m_maxRequests(0),
46  m_managementRecordCount(0),
47  m_badSocketMessageCount(0),
48  m_badSocketKillCount(0),
49  m_messageCount(0),
50  m_activeThreads(threads),
51  m_maxActiveThreads(0)
52 #endif
53 {
54  if(instance != nullptr)
55  FAIL_LOG("You're not allowed to have multiple manager instances")
56  instance = this;
57  DIAG_LOG("Manager_base::Manager_base(): Initialized")
58 }
59 
61 {
62  std::lock_guard<std::mutex> lock(m_tasksMutex);
63  m_terminate=true;
64  m_transceiver.terminate();
65  m_wake.notify_all();
66 }
67 
69 {
70  std::lock_guard<std::mutex> lock(m_tasksMutex);
71  m_stop=true;
72  m_transceiver.stop();
73  m_wake.notify_all();
74 }
75 
77 {
78  std::lock_guard<std::mutex> lock(m_tasksMutex);
79  DIAG_LOG("Starting fastcgi++ manager")
80  m_stop=false;
81  m_terminate=false;
82  m_transceiver.start();
83  for(auto& thread: m_threads)
84  if(!thread.joinable())
85  {
86  std::thread newThread(&Fastcgipp::Manager_base::handler, this);
87  thread.swap(newThread);
88  }
89 }
90 
92 {
93  for(auto& thread: m_threads)
94  if(thread.joinable())
95  thread.join();
96  m_transceiver.join();
97 }
98 
99 #include <signal.h>
101 {
102  struct sigaction sigAction;
103  sigAction.sa_handler=Fastcgipp::Manager_base::signalHandler;
104  sigemptyset(&sigAction.sa_mask);
105  sigAction.sa_flags=0;
106 
107  sigaction(SIGPIPE, &sigAction, NULL);
108  sigaction(SIGUSR1, &sigAction, NULL);
109  sigaction(SIGTERM, &sigAction, NULL);
110 }
111 
113 {
114  switch(signum)
115  {
116  case SIGUSR1:
117  {
118  if(instance)
119  {
120  DIAG_LOG("Received SIGUSR1. Stopping fastcgi++ manager.")
121  instance->stop();
122  }
123  else
124  WARNING_LOG("Received SIGUSR1 but fastcgi++ manager isn't "\
125  "running")
126  break;
127  }
128  case SIGTERM:
129  {
130  if(instance)
131  {
132  DIAG_LOG("Received SIGTERM. Terminating fastcgi++ manager.")
133  instance->terminate();
134  }
135  else
136  WARNING_LOG("Received SIGTERM but fastcgi++ manager isn't "\
137  "running")
138  break;
139  }
140  }
141 }
142 
144 {
145  Message message;
146  Socket socket;
147  {
148  std::lock_guard<std::mutex> lock(m_messagesMutex);
149  message = std::move(m_messages.front().first);
150  socket = m_messages.front().second;
151  m_messages.pop();
152  }
153 
154  if(message.type == 0)
155  {
156  const Protocol::Header& header
157  = *reinterpret_cast<Protocol::Header*>(message.data.begin());
158  switch(header.type)
159  {
161  {
162  const char* name;
163  const char* value;
164  const char* end;
165 
167  message.data.begin()+sizeof(header),
168  message.data.end(),
169  name,
170  value,
171  end))
172  {
173  switch(value-name)
174  {
175  case 14:
176  {
177  if(std::equal(name, value, "FCGI_MAX_CONNS"))
178  {
179  Block record(
180  reinterpret_cast<const char*>(
182  sizeof(Protocol::maxConnsReply));
183  m_transceiver.send(
184  socket,
185  std::move(record),
186  false);
187  }
188  break;
189  }
190  case 13:
191  {
192  if(std::equal(name, value, "FCGI_MAX_REQS"))
193  {
194  Block record(
195  reinterpret_cast<const char*>(
197  sizeof(Protocol::maxReqsReply));
198  m_transceiver.send(
199  socket,
200  std::move(record),
201  false);
202  }
203  break;
204  }
205  case 15:
206  {
207  if(std::equal(name, value, "FCGI_MPXS_CONNS"))
208  {
209  Block record(
210  reinterpret_cast<const char*>(
212  sizeof(Protocol::mpxsConnsReply));
213  m_transceiver.send(
214  socket,
215  std::move(record),
216  false);
217  }
218  break;
219  }
220  }
221  }
222  break;
223  }
224 
225  default:
226  {
227  Block record(
228  sizeof(Protocol::Header)
229  +sizeof(Protocol::UnknownType));
230 
231  Protocol::Header& sendHeader
232  = *reinterpret_cast<Protocol::Header*>(record.begin());
233  sendHeader.version = Protocol::version;
235  sendHeader.fcgiId = 0;
236  sendHeader.contentLength = sizeof(Protocol::UnknownType);
237  sendHeader.paddingLength = 0;
238 
239  Protocol::UnknownType& sendBody
240  = *reinterpret_cast<Protocol::UnknownType*>(
241  record.begin()+sizeof(header));
242  sendBody.type = header.type;
243 
244  m_transceiver.send(socket, std::move(record), false);
245 
246  break;
247  }
248  }
249  }
250  else
251  ERROR_LOG("Got a non-FastCGI record destined for the manager")
252 }
253 
255 {
256  std::unique_lock<std::shared_timed_mutex> requestsWriteLock(
257  m_requestsMutex,
258  std::defer_lock);
259  std::unique_lock<std::mutex> tasksLock(m_tasksMutex);
260  std::shared_lock<std::shared_timed_mutex> requestsReadLock(m_requestsMutex);
261 
262  while(!m_terminate && !(m_stop && m_requests.empty()))
263  {
264  requestsReadLock.unlock();
265  while(!m_tasks.empty())
266  {
267  auto id = m_tasks.front();
268  m_tasks.pop();
269  tasksLock.unlock();
270 
271  if(id.m_id == 0)
272  localHandler();
273  else
274  {
275  requestsReadLock.lock();
276  auto request = m_requests.find(id);
277  if(request != m_requests.end())
278  {
279  std::unique_lock<std::mutex> requestLock(
280  request->second->mutex,
281  std::try_to_lock);
282  requestsReadLock.unlock();
283 
284  if(requestLock)
285  {
286  auto lock = request->second->handler();
287  if(!lock || !id.m_socket.valid())
288  {
289 #if FASTCGIPP_LOG_LEVEL > 3
290  if(!id.m_socket.valid())
291  ++m_badSocketKillCount;
292 #endif
293  if(lock)
294  lock.unlock();
295  requestsWriteLock.lock();
296  requestLock.unlock();
297  m_requests.erase(request);
298  requestsWriteLock.unlock();
299  }
300  else
301  {
302  requestLock.unlock();
303  lock.unlock();
304  }
305  }
306  }
307  else
308  requestsReadLock.unlock();
309  }
310  tasksLock.lock();
311  }
312 
313  requestsReadLock.lock();
314  if(m_terminate || (m_stop && m_requests.empty()))
315  break;
316  requestsReadLock.unlock();
317 #if FASTCGIPP_LOG_LEVEL > 3
318  --m_activeThreads;
319 #endif
320  m_wake.wait(tasksLock);
321 #if FASTCGIPP_LOG_LEVEL > 3
322  if(!m_stop && !m_terminate)
323  {
324  ++m_activeThreads;
325  m_maxActiveThreads = std::max(m_activeThreads, m_maxActiveThreads);
326  }
327 #endif
328  requestsReadLock.lock();
329  }
330 }
331 
333 {
334  if(id.m_id == 0)
335  {
336 #if FASTCGIPP_LOG_LEVEL > 3
337  ++m_managementRecordCount;
338 #endif
339  std::lock_guard<std::mutex> lock(m_messagesMutex);
340  m_messages.push(std::make_pair(std::move(message), id.m_socket));
341  }
342  else if(id.m_id == Protocol::badFcgiId)
343  {
344 #if FASTCGIPP_LOG_LEVEL > 3
345  ++m_badSocketMessageCount;
346 #endif
347  std::lock_guard<std::shared_timed_mutex> lock(m_requestsMutex);
348  const auto range = m_requests.equal_range(id.m_socket);
349  auto request = range.first;
350  while(request != range.second)
351  {
352  std::unique_lock<std::mutex> lock(
353  request->second->mutex,
354  std::try_to_lock);
355  if(lock)
356  {
357  lock.unlock();
358  request = m_requests.erase(request);
359 #if FASTCGIPP_LOG_LEVEL > 3
360  ++m_badSocketKillCount;
361 #endif
362  }
363  else
364  ++request;
365  }
366  return;
367  }
368  else
369  {
370 #if FASTCGIPP_LOG_LEVEL > 3
371  ++m_messageCount;
372 #endif
373  std::unique_lock<std::shared_timed_mutex> lock(m_requestsMutex);
374  auto request = m_requests.find(id);
375  if(request == m_requests.end())
376  {
377  if(message.type == 0)
378  {
379  const Protocol::Header& header=
380  *reinterpret_cast<Protocol::Header*>(message.data.begin());
382  {
383  const Protocol::BeginRequest& body
384  = *reinterpret_cast<Protocol::BeginRequest*>(
385  message.data.begin()
386  +sizeof(header));
387 
388  request = m_requests.emplace(
389  std::piecewise_construct,
390  std::forward_as_tuple(id),
391  std::forward_as_tuple()).first;
392 
393  request->second = makeRequest(
394  id,
395  body.role,
396  body.kill());
397  lock.unlock();
398 #if FASTCGIPP_LOG_LEVEL > 3
399  ++m_requestCount;
400  m_maxRequests = std::max(m_maxRequests, m_requests.size());
401 #endif
402  }
403  else
404  WARNING_LOG("Got a non BEGIN_REQUEST record for a request"\
405  " that doesn't exist")
406  }
407  return;
408  }
409  else
410  request->second->push(std::move(message));
411  }
412  std::lock_guard<std::mutex> lock(m_tasksMutex);
413  m_tasks.push(id);
414  m_wake.notify_one();
415 }
416 
418 {
419  if(m_stop)
420  {
421  m_threads.resize(threads);
422 #if FASTCGIPP_LOG_LEVEL > 3
423  m_activeThreads = threads;
424 #endif
425  }
426 }
427 
429 {
430  instance=nullptr;
431  terminate();
432  DIAG_LOG("Manager_base::~Manager_base(): New requests ============== " \
433  << m_requestCount)
434  DIAG_LOG("Manager_base::~Manager_base(): Max concurrent requests === " \
435  << m_maxRequests)
436  DIAG_LOG("Manager_base::~Manager_base(): Management records ======== " \
437  << m_managementRecordCount)
438  DIAG_LOG("Manager_base::~Manager_base(): Bad socket messages ======= " \
439  << m_badSocketMessageCount)
440  DIAG_LOG("Manager_base::~Manager_base(): Bad socket request kills == " \
441  << m_badSocketKillCount)
442  DIAG_LOG("Manager_base::~Manager_base(): Request messages received = " \
443  << m_messageCount)
444  DIAG_LOG("Manager_base::~Manager_base(): Maximum active threads ==== " \
445  << m_maxActiveThreads)
446  DIAG_LOG("Manager_base::~Manager_base(): Remaining requests ======== " \
447  << m_requests.size())
448  DIAG_LOG("Manager_base::~Manager_base(): Remaining tasks =========== " \
449  << m_tasks.size())
450  DIAG_LOG("Manager_base::~Manager_base(): Remaining local messages == " \
451  << m_messages.size())
452 }
FAIL_LOG
#define FAIL_LOG(data)
Log any "errors" that cannot be recovered from and then exit.
Definition: log.hpp:91
Fastcgipp::Manager_base::setupSignals
static void setupSignals()
Configure the handlers for POSIX signals.
Definition: manager.cpp:100
Fastcgipp::Message::type
int type
Type of message. A 0 means FastCGI record. Anything else is open.
Definition: message.hpp:106
Fastcgipp::Protocol::Header::version
uint8_t version
FastCGI version number.
Definition: protocol.hpp:208
Fastcgipp::Manager_base::stop
void stop()
Call from any thread to stop the Manager.
Definition: manager.cpp:68
FASTCGIPP_LOG_LEVEL
#define FASTCGIPP_LOG_LEVEL
Definition: config.hpp:7
Fastcgipp::Block::end
char * end()
Pointer to 1+ the last element.
Definition: block.hpp:149
Fastcgipp::Manager_base::start
void start()
Call from any thread to start the Manager.
Definition: manager.cpp:76
Fastcgipp::Manager_base::localHandler
void localHandler()
Handles management messages.
Definition: manager.cpp:143
Fastcgipp::Block
Data structure to hold a block of raw data.
Definition: block.hpp:61
Fastcgipp::Manager_base::terminate
void terminate()
Call from any thread to terminate the Manager.
Definition: manager.cpp:60
Fastcgipp::Block::begin
char * begin()
Pointer to the first element.
Definition: block.hpp:137
manager.hpp
Declares the Manager class.
Fastcgipp::Manager_base::instance
static Manager_base * instance
Pointer to the Manager object.
Definition: manager.hpp:289
Fastcgipp::Protocol::RecordType::UNKNOWN_TYPE
@ UNKNOWN_TYPE
Fastcgipp::Message
Data structure used to pass messages to requests.
Definition: message.hpp:63
Fastcgipp::Manager_base::signalHandler
static void signalHandler(int signum)
General function to handler POSIX signals.
Definition: manager.cpp:112
Fastcgipp::Manager_base::handler
void handler()
General handling function to have it's own thread.
Definition: manager.cpp:254
Fastcgipp::Manager_base::Manager_base
Manager_base(unsigned threads)
Sole constructor.
Definition: manager.cpp:34
Fastcgipp::Manager_base::push
void push(Protocol::RequestId id, Message &&message)
Pass a message to a request.
Definition: manager.cpp:332
Fastcgipp::Protocol::maxConnsReply
const ManagementReply< 14, 2 > maxConnsReply
The maximum allowed file descriptors open at a time.
Fastcgipp::Protocol::Header::fcgiId
BigEndian< FcgiId > fcgiId
Request ID.
Definition: protocol.hpp:214
Fastcgipp::Manager_base::resizeThreads
void resizeThreads(unsigned threads)
Call before start to change the number of threads.
Definition: manager.cpp:417
Fastcgipp::Message::data
Block data
The raw data being passed along with the message.
Definition: message.hpp:109
Fastcgipp::Protocol::Header::paddingLength
uint8_t paddingLength
Length of record padding.
Definition: protocol.hpp:220
Fastcgipp::Protocol::RecordType::BEGIN_REQUEST
@ BEGIN_REQUEST
Fastcgipp::Protocol::Header::contentLength
BigEndian< uint16_t > contentLength
Content length.
Definition: protocol.hpp:217
Fastcgipp::Protocol::BeginRequest::kill
constexpr bool kill() const noexcept
Get keep alive value from the record body.
Definition: protocol.hpp:251
Fastcgipp::Protocol::UnknownType::type
RecordType type
Unknown record type.
Definition: protocol.hpp:279
Fastcgipp::Protocol::BeginRequest
The body for FastCGI records with a RecordType of BEGIN_REQUEST.
Definition: protocol.hpp:236
Fastcgipp::Protocol::RecordType::GET_VALUES
@ GET_VALUES
Fastcgipp::Manager_base::~Manager_base
~Manager_base()
Definition: manager.cpp:428
Fastcgipp::Protocol::Header
Data structure used as the header for FastCGI records.
Definition: protocol.hpp:205
WARNING_LOG
#define WARNING_LOG(data)
Log any externally caused "errors".
Definition: log.hpp:124
Fastcgipp::Socket
Class for representing an OS level I/O socket.
Definition: sockets.hpp:79
ERROR_LOG
#define ERROR_LOG(data)
Log any "errors" that can be recovered from.
Definition: log.hpp:107
Fastcgipp::Protocol::Header::type
RecordType type
Record type.
Definition: protocol.hpp:211
Fastcgipp::Protocol::UnknownType
The body for FastCGI records with a RecordType of UNKNOWN_TYPE.
Definition: protocol.hpp:276
Fastcgipp
Topmost namespace for the fastcgi++ library.
Definition: fcgistreambuf.cpp:34
log.hpp
Declares the Fastcgipp debugging/logging facilities.
Fastcgipp::Manager_base::join
void join()
Block until a stop() or terminate() is called and completed.
Definition: manager.cpp:91
Fastcgipp::Protocol::badFcgiId
constexpr uint16_t badFcgiId
Constant that defines a bad/special FcgiId.
Definition: protocol.hpp:77
Fastcgipp::Protocol::version
static constexpr int version
The version of the FastCGI protocol that this adheres to.
Definition: protocol.hpp:174
Fastcgipp::Protocol::RequestId
A unique identifier for each FastCGI request.
Definition: protocol.hpp:88
DIAG_LOG
#define DIAG_LOG(data)
Definition: log.hpp:158
Fastcgipp::Protocol::BeginRequest::role
BigEndian< Role > role
Role.
Definition: protocol.hpp:257
Fastcgipp::Protocol::processParamHeader
bool processParamHeader(const char *data, const char *const dataEnd, const char *&name, const char *&value, const char *&end)
Process the body of a FastCGI record of type RecordType::PARAMS.
Definition: protocol.cpp:32
Fastcgipp::Manager_base
General task and protocol management class base.
Definition: manager.hpp:81
Fastcgipp::Logging::header
void header(Level level)
Send a log header to logstream.
Definition: log.cpp:107
Fastcgipp::Protocol::maxReqsReply
const ManagementReply< 13, 2 > maxReqsReply
The maximum allowed requests at a time.
Fastcgipp::Protocol::mpxsConnsReply
const ManagementReply< 15, 1 > mpxsConnsReply
Where or not requests can be multiplexed over a single connections.