1 /**
2  * PostgreSQL protocol implementation.
3  * !!! UNFINISHED !!!
4  *
5  * License:
6  *   This Source Code Form is subject to the terms of
7  *   the Mozilla Public License, v. 2.0. If a copy of
8  *   the MPL was not distributed with this file, You
9  *   can obtain one at http://mozilla.org/MPL/2.0/.
10  *
11  * Authors:
12  *   Vladimir Panteleev <vladimir@thecybershadow.net>
13  */
14 
15 module ae.net.db.psql;
16 
17 import std.array;
18 import std.exception;
19 import std.string;
20 
21 import std.bitmanip : nativeToBigEndian, bigEndianToNative;
22 
23 import ae.net.asockets;
24 import ae.utils.array;
25 import ae.utils.exception;
26 
27 class PgSqlConnection
28 {
29 public:
30 	this(IConnection conn, string user, string database)
31 	{
32 		this.conn = conn;
33 		this.user = user;
34 		this.database = database;
35 
36 		conn.handleConnect = &onConnect;
37 		conn.handleReadData = &onReadData;
38 	}
39 
40 	struct ErrorResponse
41 	{
42 		struct Field
43 		{
44 			char type;
45 			char[] str;
46 
47 			string toString() { return "%s=%s".format(type, str); }
48 		}
49 		Field[] fields;
50 
51 		string toString()
52 		{
53 			return "%-(%s;%)".format(fields);
54 		}
55 	}
56 
57 	enum TransactionStatus : char
58 	{
59 		idle = 'I',
60 		inTransaction = 'T',
61 		failed = 'E',
62 	}
63 
64 	struct FieldDescription
65 	{
66 		char[] name;
67 		uint tableID;
68 		uint type;
69 		short size;
70 		uint modifier;
71 		ushort formatCode;
72 	}
73 
74 	void delegate(ErrorResponse response) handleError;
75 	void delegate() handleAuthenticated;
76 	void delegate(char[] name, char[] value) handleParameterStatus;
77 	void delegate(TransactionStatus transactionStatus) handleReadyForQuery;
78 
79 	string applicationName = "ae.net.db.psql";
80 
81 private:
82 	IConnection conn;
83 
84 	string user;
85 	string database;
86 
87 	enum ushort protocolVersionMajor = 3;
88 	enum ushort protocolVersionMinor = 0;
89 
90 	enum PacketType : char
91 	{
92 		authenticationRequest = 'R',
93 		backendKeyData = 'K',
94 		errorResponse = 'E',
95 		parameterStatus = 'S',
96 		readyForQuery = 'Z',
97 		rowDescription = 'T',
98 	}
99 
100 	static T readInt(T)(ref Data data)
101 	{
102 		enforce!PgSqlException(data.length >= T.sizeof, "Not enough data in packet");
103 		T result = bigEndianToNative!T(cast(ubyte[T.sizeof])data.contents[0..T.sizeof]);
104 		data = data[T.sizeof..$];
105 		return result;
106 	}
107 
108 	static char readChar(ref Data data)
109 	{
110 		return cast(char)readInt!ubyte(data);
111 	}
112 
113 	static char[] readString(ref Data data)
114 	{
115 		char[] s = cast(char[])data.contents;
116 		auto p = s.indexOf('\0');
117 		enforce!PgSqlException(p >= 0, "Unterminated string in packet");
118 		char[] result = s[0..p];
119 		data = data[p+1..$];
120 		return result;
121 	}
122 
123 	void onConnect()
124 	{
125 		sendStartupMessage();
126 	}
127 
128 	Data packetBuf;
129 
130 	void onReadData(Data data)
131 	{
132 		packetBuf ~= data;
133 		while (packetBuf.length >= 5)
134 		{
135 			auto length = { Data temp = packetBuf[1..5]; return readInt!uint(temp); }();
136 			if (packetBuf.length >= 1 + length)
137 			{
138 				auto packetData = packetBuf[0 .. 1 + length];
139 				packetBuf = packetBuf[1 + length .. $];
140 				if (!packetBuf.length)
141 					packetBuf = Data.init;
142 
143 				auto packetType = cast(PacketType)readChar(packetData);
144 				packetData = packetData[4..$]; // Skip length
145 				processPacket(packetType, packetData);
146 			}
147 		}
148 	}
149 
150 	void processPacket(PacketType type, Data data)
151 	{
152 		switch (type)
153 		{
154 			case PacketType.authenticationRequest:
155 			{
156 				auto result = readInt!uint(data);
157 				enforce!PgSqlException(result == 0, "Authentication failed");
158 				if (handleAuthenticated)
159 					handleAuthenticated();
160 				break;
161 			}
162 			case PacketType.backendKeyData:
163 			{
164 				// TODO?
165 				break;
166 			}
167 			case PacketType.errorResponse:
168 			{
169 				ErrorResponse response;
170 				while (data.length)
171 				{
172 					auto fieldType = readChar(data);
173 					if (!fieldType)
174 						break;
175 					response.fields ~= ErrorResponse.Field(fieldType, readString(data));
176 				}
177 				if (handleError)
178 					handleError(response);
179 				else
180 					throw new PgSqlException(response.toString());
181 				break;
182 			}
183 			case PacketType.parameterStatus:
184 				if (handleParameterStatus)
185 				{
186 					char[] name = readString(data);
187 					char[] value = readString(data);
188 					handleParameterStatus(name, value);
189 				}
190 				break;
191 			case PacketType.readyForQuery:
192 				if (handleReadyForQuery)
193 					handleReadyForQuery(cast(TransactionStatus)readChar(data));
194 				break;
195 			case PacketType.rowDescription:
196 			{
197 				auto fieldCount = readInt!ushort(data);
198 				auto fields = new FieldDescription[fieldCount];
199 				foreach (n; 0..fieldCount)
200 				{
201 				}
202 				break;
203 			}
204 			default:
205 				throw new Exception("Unknown packet type '%s'".format(char(type)));
206 		}
207 	}
208 
209 	static void write(T)(ref Appender!(ubyte[]) buf, T value)
210 	{
211 		static if (is(T : long))
212 		{
213 			buf.put(nativeToBigEndian(value)[]);
214 		}
215 		else
216 		static if (is(T : const(char)[]))
217 		{
218 			buf.put(cast(const(ubyte)[])value);
219 			buf.put(ubyte(0));
220 		}
221 		else
222 			static assert(false, "Can't write " ~ T.stringof);
223 	}
224 
225 	void sendStartupMessage()
226 	{
227 		auto buf = appender!(ubyte[]);
228 
229 		write(buf, protocolVersionMajor);
230 		write(buf, protocolVersionMinor);
231 
232 		write(buf, "user");
233 		write(buf, user);
234 
235 		write(buf, "database");
236 		write(buf, database);
237 
238 		write(buf, "application_name");
239 		write(buf, applicationName);
240 
241 		write(buf, "client_encoding");
242 		write(buf, "UTF8");
243 
244 		write(buf, "");
245 
246 		conn.send(Data(nativeToBigEndian(cast(uint)(buf.data.length + uint.sizeof))[]));
247 		conn.send(Data(buf.data));
248 	}
249 
250 	void sendPacket(char type, const(void)[] data)
251 	{
252 		conn.send(Data(type.toArray));
253 		conn.send(Data(nativeToBigEndian(cast(uint)(data.length + uint.sizeof))[]));
254 		conn.send(Data(data));
255 	}
256 	
257 	void sendQuery(const(char)[] query)
258 	{
259 		auto buf = appender!(ubyte[]);
260 		write(buf, query);
261 		sendPacket('Q', buf.data);
262 	}
263 }
264 
265 mixin DeclareException!q{PgSqlException};
266 
267 version (HAVE_PSQL_SERVER)
268 unittest
269 {
270 	import std.process : environment;
271 
272 	auto conn = new TcpConnection();
273 	auto pg = new PgSqlConnection(conn, environment["USER"], environment["USER"]);
274 	conn.connect("localhost", 5432);
275 	pg.handleReadyForQuery = (PgSqlConnection.TransactionStatus ts) {
276 		pg.handleReadyForQuery = null;
277 		pg.sendQuery("SELECT 2+2;");
278 	};
279 	socketManager.loop();
280 }