fastcgi++  3.1alpha
A C++ FastCGI/Web API
connection.cpp
Go to the documentation of this file.
1 
10 /*******************************************************************************
11 * Copyright (C) 2018 Eddie Carle [eddie@isatec.ca] *
12 * *
13 * This file is part of fastcgi++. *
14 * *
15 * fastcgi++ is free software: you can redistribute it and/or modify it under *
16 * the terms of the GNU Lesser General Public License as published by the Free *
17 * Software Foundation, either version 3 of the License, or (at your option) *
18 * any later version. *
19 * *
20 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT ANY *
21 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS *
22 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for *
23 * more details. *
24 * *
25 * You should have received a copy of the GNU Lesser General Public License *
26 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. *
27 *******************************************************************************/
28 
29 #include <postgres.h>
30 #include <libpq-fe.h>
31 #include <catalog/pg_type.h>
32 #undef ERROR
33 #undef WARNING
34 #undef INFO
35 
37 #include "fastcgi++/log.hpp"
38 
39 #include <unistd.h>
40 #include <cstring>
41 
43 {
44  killAll();
45  while(!m_terminate && !(m_stop && m_queue.empty()))
46  {
47  // Get connected
48  if(!connected()) connect();
49 
50  // Do we have a free connection?
51  for(
52  auto connection=m_connections.begin();
53  connection != m_connections.end();
54  ++connection)
55  {
56  auto& idle = connection->second.idle;
57  if(idle)
58  {
59  auto& conn = reinterpret_cast<PGconn*&>(
60  connection->second.connection);
61  auto& query = connection->second.query;
62 
63  {
64  std::lock_guard<std::mutex> lock(m_mutex);
65  if(m_queue.empty())
66  break;
67  query = m_queue.front();
68  m_queue.pop_front();
69  }
70 
71  int result;
72  if(query.parameters)
73  result = PQsendQueryParams(
74  conn,
75  query.statement,
76  query.parameters->size(),
77  query.parameters->oids(),
78  query.parameters->raws(),
79  query.parameters->sizes(),
80  query.parameters->formats(),
81  1);
82  else
83  result = PQsendQueryParams(
84  conn,
85  query.statement,
86  0,
87  nullptr,
88  nullptr,
89  nullptr,
90  nullptr,
91  1);
92 
93  if(result != 1)
94  {
95  ERROR_LOG("Unable to dispatch SQL query: " \
96  << PQerrorMessage(conn))
97  kill(connection);
98  }
99  else
100  {
101  PQflush(conn);
102  idle = false;
103  }
104  }
105  }
106 
107 
108  // Let's see if any data is waiting for us from the connections
109  const auto pollResult = m_poll.poll(connected()?-1:m_retry);
110  if(pollResult)
111  {
112  if(pollResult.socket() == m_wakeSockets[1])
113  {
114  // Looks like it's time to wake up
115  if(pollResult.onlyIn())
116  {
117  static char x[256];
118  if(read(m_wakeSockets[1], x, 256)<1)
119  FAIL_LOG("Unable to read out of SQL::Connection wakeup socket: " << \
120  std::strerror(errno))
121  continue;
122  }
123  else if(pollResult.hup() || pollResult.rdHup())
124  FAIL_LOG("The wakeup socket in SQL::Connection hung up.")
125  else if(pollResult.err())
126  FAIL_LOG("Error in the SQL::Connection wakeup socket.")
127  }
128  else
129  {
130  auto connection = m_connections.find(pollResult.socket());
131  if(connection == m_connections.end())
132  {
133  ERROR_LOG("Poll gave fd " << pollResult.socket() \
134  << " which isn't in m_connections.")
135  m_poll.del(pollResult.socket());
136  close(pollResult.socket());
137  continue;
138  }
139 
140  if(pollResult.in())
141  {
142  auto& conn = reinterpret_cast<PGconn*&>(
143  connection->second.connection);
144  auto& query = connection->second.query;
145  auto& idle = connection->second.idle;
146  if(idle)
147  ERROR_LOG("Recieved input data on SQL connection for "\
148  "which there is no active query")
149  else if(PQconsumeInput(conn) != 1)
150  ERROR_LOG("Error consuming SQL input: " \
151  << PQerrorMessage(conn))
152  else
153  {
154  PQflush(conn);
155  while(PQisBusy(conn) == 0)
156  {
157  PGresult* result = PQgetResult(conn);
158  if(result == nullptr)
159  {
160  // Query is complete
161  idle = true;
162  query.statement = nullptr;
163  query.parameters.reset();
164  query.results.reset();
165  if(query.callback)
166  {
167  query.callback(m_messageType);
168  query.callback=std::function<void(Message)>();
169  }
170  break;
171  }
172 
173  if(query.results->m_res == nullptr)
174  query.results->m_res = result;
175  else
176  {
177  WARNING_LOG("Multiple result sets received on"\
178  " query. Discarding extras.")
179  PQclear(result);
180  }
181  }
182  continue;
183  }
184  }
185 
186  if(pollResult.rdHup())
187  WARNING_LOG("SQL::Connection socket " \
188  << pollResult.socket() << " remotely hung up. " \
189  << "Reconnecting.")
190  else if(pollResult.hup())
191  WARNING_LOG("SQL::Connection socket " \
192  << pollResult.socket() << " hung up. Reconnecting")
193  else if(pollResult.err())
194  ERROR_LOG("Error in SQL::Connection socket " \
195  << pollResult.socket() << ". Reconnecting")
196  else
197  FAIL_LOG("Got a weird event 0x" << std::hex \
198  << pollResult.events() \
199  << " on SQL::Connection poll." )
200  kill(connection);
201  }
202  }
203  }
204  killAll();
205 }
206 
208 {
209  m_stop=true;
210  wake();
211 }
212 
214 {
215  m_terminate=true;
216  wake();
217 }
218 
220 {
221  if(!m_thread.joinable())
222  {
223  m_stop=false;
224  m_terminate=false;
225  std::thread thread(&Fastcgipp::SQL::Connection::handler, this);
226  m_thread.swap(thread);
227  }
228 }
229 
231 {
232  if(m_thread.joinable())
233  m_thread.join();
234 }
235 
237 {
238  killAll();
239 }
240 
242 {
243  static const char x=0;
244  if(write(m_wakeSockets[0], &x, 1) != 1)
245  FAIL_LOG("Unable to write to wakeup socket in SQL::Connection: " \
246  << std::strerror(errno))
247 }
248 
250  const char* host,
251  const char* db,
252  const char* username,
253  const char* password,
254  const unsigned concurrency,
255  const unsigned short port,
256  int messageType,
257  unsigned retryInterval)
258 {
259  if(!m_initialized)
260  {
261  socketpair(AF_UNIX, SOCK_STREAM, 0, m_wakeSockets);
262  m_poll.add(m_wakeSockets[1]);
263  m_host = host;
264  m_db = db;
265  m_username = username;
266  m_password = password;
267  m_concurrency = concurrency;
268  m_port = std::to_string(port);
269  m_messageType = messageType;
270  m_retry = retryInterval*1000;
271  m_initialized = true;
272  }
273 }
274 
276 {
277  if(!m_stop && connected())
278  {
279  if(query.parameters)
280  query.parameters->build();
281 
282  std::lock_guard<std::mutex> lock(m_mutex);
283  m_queue.push_back(query);
284  wake();
285  return true;
286  }
287  return false;
288 }
289 
291 {
292  while(!connected())
293  {
294  Conn conn;
295  conn.connection = PQsetdbLogin(
296  m_host.c_str(),
297  m_port.c_str(),
298  nullptr,
299  nullptr,
300  m_db.c_str(),
301  m_username.c_str(),
302  m_password.c_str());
303  auto& connection = reinterpret_cast<PGconn*&>(conn.connection);
304  if(connection == nullptr)
305  {
306  ERROR_LOG("Error initiating connection to postgresql server: " \
307  << PQerrorMessage(connection))
308  break;
309  }
310  if(PQstatus(connection) != CONNECTION_OK)
311  {
312  ERROR_LOG("Error connecting to postgresql server: " \
313  << PQerrorMessage(connection))
314  PQfinish(connection);
315  break;
316  }
317  if(PQsetnonblocking(connection, 1) != 0)
318  {
319  ERROR_LOG("Error setting nonblock on postgresql connection: " \
320  << PQerrorMessage(connection))
321  PQfinish(connection);
322  break;
323  }
324 
325  conn.idle = true;
326  const auto socket = PQsocket(connection);
327  m_poll.add(socket);
328  m_connections[socket] = conn;
329  }
330 }
331 
332 void Fastcgipp::SQL::Connection::kill(std::map<socket_t, Conn>::iterator& conn)
333 {
334  PQfinish(reinterpret_cast<PGconn*>(conn->second.connection));
335  m_poll.del(conn->first);
336  if(!conn->second.idle)
337  {
338  std::lock_guard<std::mutex> lock(m_mutex);
339  m_queue.push_front(conn->second.query);
340  }
341  m_connections.erase(conn);
342 }
343 
345 {
346  for(auto& connection: m_connections)
347  {
348  PQfinish(reinterpret_cast<PGconn*>(connection.second.connection));
349  m_poll.del(connection.first);
350  }
351  m_connections.clear();
352  std::lock_guard<std::mutex> lock(m_mutex);
353  m_queue.clear();
354 }
FAIL_LOG
#define FAIL_LOG(data)
Log any "errors" that cannot be recovered from and then exit.
Definition: log.hpp:91
Fastcgipp::Poll::poll
Result poll(int timeout)
Initiate poll on group.
Definition: poll.cpp:69
Fastcgipp::SQL::Connection::m_wakeSockets
socket_t m_wakeSockets[2]
A pair of sockets for wakeup purposes.
Definition: connection.hpp:230
Fastcgipp::SQL::Connection::m_queue
std::deque< Query > m_queue
Buffer for transmitting data
Definition: connection.hpp:215
Fastcgipp::SQL::Connection::handler
void handler()
General connection handler.
Definition: connection.cpp:42
Fastcgipp::SQL::Query
Structure to hold SQL query data.
Definition: connection.hpp:85
Fastcgipp::SQL::Connection::connect
void connect()
Call this to initiate all connections with the server.
Definition: connection.cpp:290
Fastcgipp::SQL::Connection::m_mutex
std::mutex m_mutex
Always practice safe threading.
Definition: connection.hpp:227
Fastcgipp::SQL::Connection::m_connections
std::map< socket_t, Conn > m_connections
Container associating sockets with their receive buffers.
Definition: connection.hpp:206
Fastcgipp::SQL::Connection::~Connection
~Connection()
Definition: connection.cpp:236
Fastcgipp::SQL::Connection::queue
bool queue(const Query &query)
Queue up a query.
Definition: connection.cpp:275
Fastcgipp::SQL::Connection::m_stop
std::atomic_bool m_stop
True when handler() should be stopping.
Definition: connection.hpp:221
Fastcgipp::SQL::Connection::Conn
Definition: connection.hpp:198
Fastcgipp::SQL::Connection::Conn::connection
void * connection
Definition: connection.hpp:201
connection.hpp
Declares the Fastcgipp::SQL::Connection class.
Fastcgipp::SQL::Query::parameters
std::shared_ptr< Parameters_base > parameters
Parameters
Definition: connection.hpp:108
WARNING_LOG
#define WARNING_LOG(data)
Log any externally caused "errors".
Definition: log.hpp:124
Fastcgipp::Poll::del
bool del(const socket_t socket)
Remove a socket identifier to the poll list.
Definition: poll.cpp:139
ERROR_LOG
#define ERROR_LOG(data)
Log any "errors" that can be recovered from.
Definition: log.hpp:107
Fastcgipp::SQL::Connection::Conn::idle
bool idle
Definition: connection.hpp:200
Fastcgipp::SQL::Connection::stop
void stop()
Call from any thread to stop the handler() thread.
Definition: connection.cpp:207
Fastcgipp::SQL::Connection::killAll
void killAll()
Kill and destroy everything!!
Definition: connection.cpp:344
Fastcgipp::SQL::Connection::wake
void wake()
Call this to wakeup the thread if it's sleeping.
Definition: connection.cpp:241
Fastcgipp::SQL::Connection::m_poll
Poll m_poll
The poll group.
Definition: connection.hpp:257
Fastcgipp::SQL::Connection::m_terminate
std::atomic_bool m_terminate
True when handler() should be terminating.
Definition: connection.hpp:218
Fastcgipp::SQL::Connection::m_retry
unsigned m_retry
Connection retry interval
Definition: connection.hpp:248
Fastcgipp::SQL::Connection::join
void join()
Block until a stop() or terminate() is completed
Definition: connection.cpp:230
Fastcgipp::SQL::Connection::init
void init(const char *host, const char *db, const char *username, const char *password, const unsigned concurrency=1, const unsigned short port=5432, int messageType=5432, unsigned retryInterval=30)
Initialize the connection.
Definition: connection.cpp:249
Fastcgipp::SQL::Connection::connected
bool connected()
Are we fully connected?
Definition: connection.hpp:190
log.hpp
Declares the Fastcgipp debugging/logging facilities.
Fastcgipp::SQL::Connection::terminate
void terminate()
Call from any thread to terminate the handler() thread.
Definition: connection.cpp:213
Fastcgipp::SQL::Connection::kill
void kill(std::map< socket_t, Conn >::iterator &conn)
Kill and destroy this specific connection.
Definition: connection.cpp:332
Fastcgipp::SQL::Connection::start
void start()
Call from any thread to start the handler() thread.
Definition: connection.cpp:219
Fastcgipp::SQL::Connection::m_messageType
unsigned m_messageType
Callback message type ID.
Definition: connection.hpp:254