/*
* R : A Computer Language for Statistical Data Analysis
* Copyright (C) 2009-12 The R Core Team.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, a copy is available at
* http://www.r-project.org/Licenses/
*/
/* This is a small HTTP server that serves requests by evaluating
* the httpd() function and passing the result to the browser. */
/* Example:
httpd <- function(path,query=NULL,...) {
cat("Request for:", path,"\n"); print(query);
list(paste("Hello, world!
You asked for \"",path,"\".",sep=''))
}
.Internal(startHTTPD("127.0.0.1",8080))
*/
/* size of the line buffer for each worker (request and header only)
* requests that have longer headers will be rejected with 413 */
#define LINE_BUF_SIZE 1024
/* maximum number of active workers (parallel connections)
* when exceeded the server closes new connections */
#define MAX_WORKERS 32
/* --- Rhttpd implementation --- */
#ifdef HAVE_CONFIG_H
#include
#endif
#include
#include
#include
#include
#include
#include
#include
#define HttpdServerActivity 8
#define HttpdWorkerActivity 9
/* this is orignally from sisock.h - system independent sockets */
#ifndef _WIN32
# include
# include
# ifdef HAVE_UNISTD_H
# include
# endif
# ifdef HAVE_BSD_NETWORKING
# include
# include
# include
# include
# endif
# include
# define sockerrno errno
# define SOCKET int
# define INVALID_SOCKET (-1)
# define closesocket(A) close(A)
# define initsocks()
# define donesocks()
#else
/* --- Windows-only --- */
# include
# include
# include
# define sockerrno WSAGetLastError()
# define ECONNREFUSED WSAECONNREFUSED
# define EADDRINUSE WSAEADDRINUSE
# define ENOTSOCK WSAENOTSOCK
# define EISCONN WSAEISCONN
# define ETIMEDOUT WSAETIMEDOUT
# define ENETUNREACH WSAENETUNREACH
# define EINPROGRESS WSAEINPROGRESS
# define EALREADY WSAEALREADY
# define EAFNOSUPPORT WSAEAFNOSUPPORT
# define EOPNOTSUPP WSAEOPNOTSUPP
# define EWOULDBLOCK WSAEWOULDBLOCK
/* those are occasionally defined by MinGW's errno, so override them
* with socket equivalents */
# ifdef EBADF
# undef EBADF
# endif
# ifdef EINVAL
# undef EINVAL
# endif
# ifdef EFAULT
# undef EFAULT
# endif
# ifdef EACCES
# undef EACCES
# endif
# define EFAULT WSAEFAULT
# define EINVAL WSAEINVAL
# define EACCES WSAEACCES
# define EBADF WSAEBADF
static int initsocks(void)
{
WSADATA dt;
/* initialize WinSock 1.1 */
return (WSAStartup(0x0101, &dt)) ? -1 : 0;
}
# define donesocks() WSACleanup()
typedef int socklen_t;
#endif /* _WIN32 */
/* --- system-independent part --- */
#define SA struct sockaddr
#define SAIN struct sockaddr_in
static struct sockaddr *build_sin(struct sockaddr_in *sa, const char *ip, int port) {
memset(sa, 0, sizeof(struct sockaddr_in));
sa->sin_family = AF_INET;
sa->sin_port = htons(port);
sa->sin_addr.s_addr = (ip) ? inet_addr(ip) : htonl(INADDR_ANY);
return (struct sockaddr*)sa;
}
/* --- END of sisock.h --- */
/* debug output - change the DBG(X) X to enable debugging output */
#define DBG(X)
/* --- httpd --- */
#define PART_REQUEST 0
#define PART_HEADER 1
#define PART_BODY 2
#define METHOD_POST 1
#define METHOD_GET 2
#define METHOD_HEAD 3
#define METHOD_OTHER 8 /* for custom requests only */
/* attributes of a connection/worker */
#define CONNECTION_CLOSE 0x01 /* Connection: close response behavior is requested */
#define HOST_HEADER 0x02 /* headers contained Host: header (required for HTTP/1.1) */
#define HTTP_1_0 0x04 /* the client requested HTTP/1.0 */
#define CONTENT_LENGTH 0x08 /* Content-length: was specified in the headers */
#define THREAD_OWNED 0x10 /* the worker is owned by a thread and cannot removed */
#define THREAD_DISPOSE 0x20 /* the thread should dispose of the worker */
#define CONTENT_TYPE 0x40 /* message has a specific content type set */
#define CONTENT_FORM_UENC 0x80 /* message content type is application/x-www-form-urlencoded */
struct buffer {
struct buffer *next, *prev;
size_t size, length;
char data[1];
};
/* we have to protect re-entrance and not continue processing if there is
a worker inside R already. If we did not then another client connection
would trigger handler and pile up eval on top of the stack, leading to
exhaustion very quickly and a big mess */
static int in_process;
/* --- connection/worker structure holding all data for an active connection --- */
typedef struct httpd_conn {
SOCKET sock; /* client socket */
struct in_addr peer; /* IP address of the peer */
#ifdef _WIN32
HANDLE thread; /* worker thread */
#else
InputHandler *ih; /* worker input handler */
#endif
char line_buf[LINE_BUF_SIZE]; /* line buffer (used for request and headers) */
char *url, *body; /* URL and request body */
char *content_type; /* content type (if set) */
size_t line_pos, body_pos; /* positions in the buffers */
long content_length; /* desired content length */
char part, method, attr; /* request part, method and connection attributes */
struct buffer *headers; /* buffer holding header lines */
} httpd_conn_t;
#define IS_HTTP_1_1(C) (((C)->attr & HTTP_1_0) == 0)
/* returns the HTTP/x.x string for a given connection - we support 1.0 and 1.1 only */
#define HTTP_SIG(C) (IS_HTTP_1_1(C) ? "HTTP/1.1" : "HTTP/1.0")
/* --- static list of currently active workers --- */
static httpd_conn_t *workers[MAX_WORKERS];
/* --- flag determining whether one-time initialization is yet to be performed --- */
static int needs_init = 1;
#ifdef _WIN32
#define WM_RHTTP_CALLBACK ( WM_USER + 1 )
static HWND message_window;
static LRESULT CALLBACK
RhttpdWindowProc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam);
#ifndef HWND_MESSAGE
#define HWND_MESSAGE ((HWND)-3) /* NOTE: this is supported by W2k/XP and up only! */
#endif
#endif
static void first_init()
{
initsocks();
#ifdef _WIN32
/* create a dummy message-only window for synchronization with the
* main event loop */
HINSTANCE instance = GetModuleHandle(NULL);
LPCTSTR class = "Rhttpd";
WNDCLASS wndclass = { 0, RhttpdWindowProc, 0, 0, instance, NULL, 0, 0,
NULL, class };
RegisterClass(&wndclass);
message_window = CreateWindow(class, "Rhttpd", 0, 1, 1, 1, 1,
HWND_MESSAGE, NULL, instance, NULL);
#endif
needs_init = 0;
}
/* free buffers starting from the tail(!!) */
static void free_buffer(struct buffer *buf) {
if (!buf) return;
if (buf->prev) free_buffer(buf->prev);
free(buf);
}
/* allocate a new buffer */
static struct buffer *alloc_buffer(int size, struct buffer *parent) {
struct buffer *buf = (struct buffer*) malloc(sizeof(struct buffer) + size);
if (!buf) return buf;
buf->next = 0;
buf->prev = parent;
if (parent) parent->next = buf;
buf->size = size;
buf->length = 0;
return buf;
}
/* convert doubly-linked buffers into one big raw vector */
static SEXP collect_buffers(struct buffer *buf) {
SEXP res;
char *dst;
int len = 0;
if (!buf) return allocVector(RAWSXP, 0);
while (buf->prev) { /* count the total length and find the root */
len += buf->length;
buf = buf->prev;
}
res = allocVector(RAWSXP, len + buf->length);
dst = (char*) RAW(res);
while (buf) {
memcpy(dst, buf->data, buf->length);
dst += buf->length;
buf = buf->next;
}
return res;
}
static void finalize_worker(httpd_conn_t *c)
{
DBG(printf("finalizing worker %p\n", (void*) c));
#ifndef _WIN32
if (c->ih) {
removeInputHandler(&R_InputHandlers, c->ih);
c->ih = NULL;
}
#endif
if (c->url) {
free(c->url);
c->url = NULL;
}
if (c->body) {
free(c->body);
c->body = NULL;
}
if (c->content_type) {
free(c->content_type);
c->content_type = NULL;
}
if (c->headers) {
free_buffer(c->headers);
c->headers = NULL;
}
if (c->sock != INVALID_SOCKET) {
closesocket(c->sock);
c->sock = INVALID_SOCKET;
}
}
/* adds a worker to the worker list and returns 0. If the list is
* full, the worker is finalized and returns -1.
* Note that we don't need locking, because add_worker is guaranteed
* to be called by the same thread (server thread).
*/
static int add_worker(httpd_conn_t *c) {
unsigned int i = 0;
for (; i < MAX_WORKERS; i++)
if (!workers[i]) {
#ifdef _WIN32
DBG(printf("registering worker %p as %d (thread=0x%x)\n", (void*) c, i, (int) c->thread));
#else
DBG(printf("registering worker %p as %d (handler=%p)\n", (void*) c, i, (void*) c->ih));
#endif
workers[i] = c;
return 0;
}
/* FIXME: ok no more space for a new worker - what do we do now?
* for now we just drop it on the floor .. */
finalize_worker(c);
free(c);
return -1;
}
/* finalize worker, remove it from the list and free the memory. If
* the worker is owned by a thread, it is not finalized and the
* THREAD_DISPOSE flag is set instead. */
static void remove_worker(httpd_conn_t *c)
{
unsigned int i = 0;
if (!c) return;
if (c->attr & THREAD_OWNED) { /* if the worker is used by a
* thread, we can only signal for
* its removal */
c->attr |= THREAD_DISPOSE;
return;
}
finalize_worker(c);
for (; i < MAX_WORKERS; i++)
if (workers[i] == c)
workers[i] = NULL;
DBG(printf("removing worker %p\n", (void*) c));
free(c);
}
#ifndef Win32
extern int R_ignore_SIGPIPE; /* defined in src/main/main.c on unix */
#else
static int R_ignore_SIGPIPE; /* for simplicity of the code below */
#endif
static int send_response(SOCKET s, const char *buf, size_t len)
{
unsigned int i = 0;
/* we have to tell R to ignore SIGPIPE otherwise it can raise an error
and get us into deep trouble */
R_ignore_SIGPIPE = 1;
while (i < len) {
ssize_t n = send(s, buf + i, len - i, 0);
if (n < 1) {
R_ignore_SIGPIPE = 0;
return -1;
}
i += n;
}
R_ignore_SIGPIPE = 0;
return 0;
}
/* sends HTTP/x.x plus the text (which should be of the form " XXX ...") */
static int send_http_response(httpd_conn_t *c, const char *text) {
char buf[96];
const char *s = HTTP_SIG(c);
size_t l = strlen(text);
ssize_t res;
/* reduce the number of packets by sending the payload en-block from buf */
if (l < sizeof(buf) - 10) {
strcpy(buf, s);
strcpy(buf + 8, text);
return send_response(c->sock, buf, l + 8);
}
R_ignore_SIGPIPE = 1;
res = send(c->sock, s, 8, 0);
R_ignore_SIGPIPE = 0;
if (res < 8) return -1;
return send_response(c->sock, text, strlen(text));
}
/* decode URI in place (decoding never expands) */
static void uri_decode(char *s)
{
char *t = s;
while (*s) {
if (*s == '+') { /* + -> SPC */
*(t++) = ' '; s++;
} else if (*s == '%') {
unsigned char ec = 0;
s++;
if (*s >= '0' && *s <= '9') ec |= ((unsigned char)(*s - '0')) << 4;
else if (*s >= 'a' && *s <= 'f') ec |= ((unsigned char)(*s - 'a' + 10)) << 4;
else if (*s >= 'A' && *s <= 'F') ec |= ((unsigned char)(*s - 'A' + 10)) << 4;
if (*s) s++;
if (*s >= '0' && *s <= '9') ec |= (unsigned char)(*s - '0');
else if (*s >= 'a' && *s <= 'f') ec |= (unsigned char)(*s - 'a' + 10);
else if (*s >= 'A' && *s <= 'F') ec |= (unsigned char)(*s - 'A' + 10);
if (*s) s++;
*(t++) = (char) ec;
} else *(t++) = *(s++);
}
*t = 0;
}
/* parse a query string into a named character vector - must NOT be
* URI decoded */
static SEXP parse_query(char *query)
{
int parts = 0;
SEXP res, names;
char *s = query, *key = 0, *value = query, *t = query;
while (*s) {
if (*s == '&') parts++;
s++;
}
parts++;
res = PROTECT(allocVector(STRSXP, parts));
names = PROTECT(allocVector(STRSXP, parts));
s = query;
parts = 0;
while (1) {
if (*s == '=' && !key) { /* first '=' in a part */
key = value;
*(t++) = 0;
value = t;
s++;
} else if (*s == '&' || !*s) { /* next part */
int last_entry = !*s;
*(t++) = 0;
if (!key) key = "";
SET_STRING_ELT(names, parts, mkChar(key));
SET_STRING_ELT(res, parts, mkChar(value));
parts++;
if (last_entry) break;
key = 0;
value = t;
s++;
} else if (*s == '+') { /* + -> SPC */
*(t++) = ' '; s++;
} else if (*s == '%') { /* we cannot use uri_decode becasue we need &/= *before* decoding */
unsigned char ec = 0;
s++;
if (*s >= '0' && *s <= '9') ec |= ((unsigned char)(*s - '0')) << 4;
else if (*s >= 'a' && *s <= 'f') ec |= ((unsigned char)(*s - 'a' + 10)) << 4;
else if (*s >= 'A' && *s <= 'F') ec |= ((unsigned char)(*s - 'A' + 10)) << 4;
if (*s) s++;
if (*s >= '0' && *s <= '9') ec |= (unsigned char)(*s - '0');
else if (*s >= 'a' && *s <= 'f') ec |= (unsigned char)(*s - 'a' + 10);
else if (*s >= 'A' && *s <= 'F') ec |= (unsigned char)(*s - 'A' + 10);
if (*s) s++;
*(t++) = (char) ec;
} else *(t++) = *(s++);
}
setAttrib(res, R_NamesSymbol, names);
UNPROTECT(2);
return res;
}
static SEXP R_ContentTypeName, R_HandlersName;
/* create an object representing the request body. It is NULL if the body is empty (or zero length).
* In the case of a URL encoded form it will have the same shape as the query string (named string vector).
* In all other cases it will be a raw vector with a "content-type" attribute (if specified in the headers) */
static SEXP parse_request_body(httpd_conn_t *c) {
if (!c || !c->body) return R_NilValue;
if (c->attr & CONTENT_FORM_UENC) { /* URL encoded form - return parsed form */
c->body[c->content_length] = 0; /* the body is guaranteed to have an extra byte for the termination */
return parse_query(c->body);
} else { /* something else - pass it as a raw vector */
SEXP res = PROTECT(Rf_allocVector(RAWSXP, c->content_length));
if (c->content_length)
memcpy(RAW(res), c->body, c->content_length);
if (c->content_type) { /* attach the content type so it can be interpreted */
if (!R_ContentTypeName) R_ContentTypeName = install("content-type");
setAttrib(res, R_ContentTypeName, mkString(c->content_type));
}
UNPROTECT(1);
return res;
}
}
#ifdef _WIN32
/* on Windows we have to guarantee that process_request is performed
* on the main thread, so we have to dispatch it through a message */
static void process_request_main_thread(httpd_conn_t *c);
static void process_request(httpd_conn_t *c)
{
/* SendMessage is synchronous, so it will wait until the message
* is processed */
DBG(Rprintf("enqueuing process_request_main_thread\n"));
SendMessage(message_window, WM_RHTTP_CALLBACK, 0, (LPARAM) c);
DBG(Rprintf("process_request_main_thread returned\n"));
}
#define process_request process_request_main_thread
#endif
/* finalize a request - essentially for HTTP/1.0 it means that
* we have to close the connection */
static void fin_request(httpd_conn_t *c) {
if (!IS_HTTP_1_1(c))
c->attr |= CONNECTION_CLOSE;
}
static SEXP custom_handlers_env;
/* returns a httpd handler (closure) for a given path. As a special case
* it can return a symbol that will be resolved in the "tools" namespace.
* currently it allows custom handlers for paths of the form
* /custom/[/.*] where must less than 64 characters long
* and is matched against closures in tools:::.httpd.handlers.env */
static SEXP handler_for_path(const char *path) {
if (path && !strncmp(path, "/custom/", 8)) { /* starts with /custom/ ? */
const char *c = path + 8, *e = c;
while (*c && *c != '/') c++; /* find out the name */
if (c - e > 0 && c - e < 64) { /* if it's 1..63 chars long, proceed */
char fn[64];
memcpy(fn, e, c - e); /* create a local C string with the name for the install() call */
fn[c - e] = 0;
DBG(Rprintf("handler_for_path('%s'): looking up custom handler '%s'\n", path, fn));
/* we cache custom_handlers_env so in case it has not been loaded yet, fetch it */
if (!custom_handlers_env) {
if (!R_HandlersName) R_HandlersName = install(".httpd.handlers.env");
custom_handlers_env = eval(R_HandlersName, R_FindNamespace(mkString("tools")));
}
/* we only proceed if .httpd.handlers.env really exists */
if (TYPEOF(custom_handlers_env) == ENVSXP) {
SEXP cl = findVarInFrame3(custom_handlers_env, install(fn), TRUE);
if (cl != R_UnboundValue && TYPEOF(cl) == CLOSXP) /* we need a closure */
return cl;
}
}
}
DBG(Rprintf(" - falling back to default httpd\n"));
return install("httpd");
}
/* process a request by calling the httpd() function in R */
static void process_request_(void *ptr)
{
httpd_conn_t *c = (httpd_conn_t*) ptr;
const char *ct = "text/html";
char *query = 0, *s;
SEXP sHeaders = R_NilValue;
int code = 200;
DBG(Rprintf("process request for %p\n", (void*) c));
if (!c || !c->url) return; /* if there is not enough to process, bail out */
s = c->url;
while (*s && *s != '?') s++; /* find the query part */
if (*s) {
*(s++) = 0;
query = s;
}
uri_decode(c->url); /* decode the path part */
{ /* construct "try(httpd(url, query, body), silent=TRUE)" */
SEXP sTrue = PROTECT(ScalarLogical(TRUE));
SEXP sBody = PROTECT(parse_request_body(c));
SEXP sQuery = PROTECT(query ? parse_query(query) : R_NilValue);
SEXP sReqHeaders = PROTECT(c->headers ? collect_buffers(c->headers) : R_NilValue);
SEXP sArgs = PROTECT(list4(mkString(c->url), sQuery, sBody, sReqHeaders));
SEXP sTry = install("try");
SEXP y, x = PROTECT(lang3(sTry,
LCONS(handler_for_path(c->url), sArgs),
sTrue));
SET_TAG(CDR(CDR(x)), install("silent"));
DBG(Rprintf("eval(try(httpd('%s'),silent=TRUE))\n", c->url));
/* evaluate the above in the tools namespace */
x = PROTECT(eval(x, R_FindNamespace(mkString("tools"))));
/* the result is expected to have one of the following forms:
a) character vector of length 1 => error (possibly from try),
will create 500 response
b) list(payload[, content-type[, headers[, status code]]])
payload: can be a character vector of length one or a
raw vector. if the character vector is named "file" then
the content of a file of that name is the payload
content-type: must be a character vector of length one
or NULL (if present, else default is "text/html")
headers: must be a character vector - the elements will
have CRLF appended and neither Content-type nor
Content-length may be used
status code: must be an integer if present (default is 200)
*/
if (TYPEOF(x) == STRSXP && LENGTH(x) > 0) { /* string means there was an error */
const char *s = CHAR(STRING_ELT(x, 0));
send_http_response(c, " 500 Evaluation error\r\nConnection: close\r\nContent-type: text/plain\r\n\r\n");
DBG(Rprintf("respond with 500 and content: %s\n", s));
if (c->method != METHOD_HEAD)
send_response(c->sock, s, strlen(s));
c->attr |= CONNECTION_CLOSE; /* force close */
UNPROTECT(7);
return;
}
if (TYPEOF(x) == VECSXP && LENGTH(x) > 0) { /* a list (generic vector) can be a real payload */
SEXP xNames = getAttrib(x, R_NamesSymbol);
if (LENGTH(x) > 1) {
SEXP sCT = VECTOR_ELT(x, 1); /* second element is content type if present */
if (TYPEOF(sCT) == STRSXP && LENGTH(sCT) > 0)
ct = CHAR(STRING_ELT(sCT, 0));
if (LENGTH(x) > 2) { /* third element is headers vector */
sHeaders = VECTOR_ELT(x, 2);
if (TYPEOF(sHeaders) != STRSXP)
sHeaders = R_NilValue;
if (LENGTH(x) > 3) /* fourth element is HTTP code */
code = asInteger(VECTOR_ELT(x, 3));
}
}
y = VECTOR_ELT(x, 0);
if (TYPEOF(y) == STRSXP && LENGTH(y) > 0) {
char buf[64];
const char *cs = CHAR(STRING_ELT(y, 0)), *fn = 0;
if (code == 200)
send_http_response(c, " 200 OK\r\nContent-type: ");
else {
sprintf(buf, "%s %d Code %d\r\nContent-type: ", HTTP_SIG(c), code, code);
send_response(c->sock, buf, strlen(buf));
}
send_response(c->sock, ct, strlen(ct));
if (sHeaders != R_NilValue) {
unsigned int i = 0, n = LENGTH(sHeaders);
for (; i < n; i++) {
const char *hs = CHAR(STRING_ELT(sHeaders, i));
send_response(c->sock, "\r\n", 2);
send_response(c->sock, hs, strlen(hs));
}
}
/* special content - a file: either list(file="") or list(c("*FILE*", "")) - the latter will go away */
if (TYPEOF(xNames) == STRSXP && LENGTH(xNames) > 0 &&
!strcmp(CHAR(STRING_ELT(xNames, 0)), "file"))
fn = cs;
if (LENGTH(y) > 1 && !strcmp(cs, "*FILE*"))
fn = CHAR(STRING_ELT(y, 1));
if (fn) {
char *fbuf;
FILE *f = fopen(fn, "rb");
long fsz = 0;
if (!f) {
send_response(c->sock, "\r\nContent-length: 0\r\n\r\n", 23);
UNPROTECT(7);
fin_request(c);
return;
}
fseek(f, 0, SEEK_END);
fsz = ftell(f);
fseek(f, 0, SEEK_SET);
sprintf(buf, "\r\nContent-length: %ld\r\n\r\n", fsz);
send_response(c->sock, buf, strlen(buf));
if (c->method != METHOD_HEAD) {
fbuf = (char*) malloc(32768);
if (fbuf) {
while (fsz > 0 && !feof(f)) {
size_t rd = (fsz > 32768) ? 32768 : fsz;
if (fread(fbuf, 1, rd, f) != rd) {
free(fbuf);
UNPROTECT(7);
c->attr |= CONNECTION_CLOSE;
return;
}
send_response(c->sock, fbuf, rd);
fsz -= rd;
}
free(fbuf);
} else { /* allocation error - get out */
UNPROTECT(7);
c->attr |= CONNECTION_CLOSE;
return;
}
}
fclose(f);
UNPROTECT(7);
fin_request(c);
return;
}
sprintf(buf, "\r\nContent-length: %u\r\n\r\n", (unsigned int) strlen(cs));
send_response(c->sock, buf, strlen(buf));
if (c->method != METHOD_HEAD)
send_response(c->sock, cs, strlen(cs));
UNPROTECT(7);
fin_request(c);
return;
}
if (TYPEOF(y) == RAWSXP) {
char buf[64];
Rbyte *cs = RAW(y);
if (code == 200)
send_http_response(c, " 200 OK\r\nContent-type: ");
else {
sprintf(buf, "%s %d Code %d\r\nContent-type: ", HTTP_SIG(c), code, code);
send_response(c->sock, buf, strlen(buf));
}
send_response(c->sock, ct, strlen(ct));
if (sHeaders != R_NilValue) {
unsigned int i = 0, n = LENGTH(sHeaders);
for (; i < n; i++) {
const char *hs = CHAR(STRING_ELT(sHeaders, i));
send_response(c->sock, "\r\n", 2);
send_response(c->sock, hs, strlen(hs));
}
}
sprintf(buf, "\r\nContent-length: %u\r\n\r\n", LENGTH(y));
send_response(c->sock, buf, strlen(buf));
if (c->method != METHOD_HEAD)
send_response(c->sock, (char*) cs, LENGTH(y));
UNPROTECT(7);
fin_request(c);
return;
}
}
UNPROTECT(7);
}
send_http_response(c, " 500 Invalid response from R\r\nConnection: close\r\nContent-type: text/plain\r\n\r\nServer error: invalid response from R\r\n");
c->attr |= CONNECTION_CLOSE; /* force close */
}
/* wrap the actual call with ToplevelExec since we need to have a guaranteed
return so we can track the presence of a worker code inside R to prevent
re-entrance from other clients */
static void process_request(httpd_conn_t *c)
{
in_process = 1;
R_ToplevelExec(process_request_, c);
in_process = 0;
}
#ifdef _WIN32
#undef process_request
#endif
/* this function is called to fetch new data from the client
* connection socket and process it */
static void worker_input_handler(void *data) {
httpd_conn_t *c = (httpd_conn_t*) data;
DBG(printf("worker_input_handler, data=%p\n", data));
if (!c) return;
if (in_process) return; /* we don't allow recursive entrance */
DBG(printf("input handler for worker %p (sock=%d, part=%d, method=%d, line_pos=%d)\n", (void*) c, (int)c->sock, (int)c->part, (int)c->method, (int)c->line_pos));
/* FIXME: there is one edge case that is not caught on unix: if
* recv reads two or more full requests into the line buffer then
* this function exits after the first one, but input handlers may
* not trigger, because there may be no further data. It is not
* trivial to fix, because just checking for a full line at the
* beginning and not calling recv won't trigger a new input
* handler. However, under normal circumstance this should not
* happen, because clients should wait for the response and even
* if they don't it's unlikely that both requests get combined
* into one packet. */
if (c->part < PART_BODY) {
char *s = c->line_buf;
ssize_t n = recv(c->sock, c->line_buf + c->line_pos,
LINE_BUF_SIZE - c->line_pos - 1, 0);
DBG(printf("[recv n=%d, line_pos=%d, part=%d]\n", n, c->line_pos, (int)c->part));
if (n < 0) { /* error, scrape this worker */
remove_worker(c);
return;
}
if (n == 0) { /* connection closed -> try to process and then remove */
process_request(c);
remove_worker(c);
return;
}
c->line_pos += n;
c->line_buf[c->line_pos] = 0;
DBG(printf("in buffer: {%s}\n", c->line_buf));
while (*s) {
/* ok, we have genuine data in the line buffer */
if (s[0] == '\n' || (s[0] == '\r' && s[1] == '\n')) { /* single, empty line - end of headers */
/* --- check request validity --- */
DBG(printf(" end of request, moving to body\n"));
if (!(c->attr & HTTP_1_0) && !(c->attr & HOST_HEADER)) { /* HTTP/1.1 mandates Host: header */
send_http_response(c, " 400 Bad Request (Host: missing)\r\nConnection: close\r\n\r\n");
remove_worker(c);
return;
}
if (c->attr & CONTENT_LENGTH && c->content_length) {
if (c->content_length < 0 || /* we are parsing signed so negative numbers are bad */
c->content_length > 2147483640 || /* R will currently have issues with body around 2Gb or more, so better to not go there */
!(c->body = (char*) malloc(c->content_length + 1 /* allocate an extra termination byte */ ))) {
send_http_response(c, " 413 Request Entity Too Large (request body too big)\r\nConnection: close\r\n\r\n");
remove_worker(c);
return;
}
}
c->body_pos = 0;
c->part = PART_BODY;
if (s[0] == '\r') s++;
s++;
/* move the body part to the beginning of the buffer */
c->line_pos -= s - c->line_buf;
memmove(c->line_buf, s, c->line_pos);
/* GET/HEAD or no content length mean no body */
if (c->method == METHOD_GET || c->method == METHOD_HEAD ||
!(c->attr & CONTENT_LENGTH) || c->content_length == 0) {
if ((c->attr & CONTENT_LENGTH) && c->content_length > 0) {
send_http_response(c, " 400 Bad Request (GET/HEAD with body)\r\n\r\n");
remove_worker(c);
return;
}
process_request(c);
if (c->attr & CONNECTION_CLOSE) {
remove_worker(c);
return;
}
/* keep-alive - reset the worker so it can process a new request */
if (c->url) { free(c->url); c->url = NULL; }
if (c->body) { free(c->body); c->body = NULL; }
if (c->content_type) { free(c->content_type); c->content_type = NULL; }
if (c->headers) { free_buffer(c->headers); c->headers = NULL; }
c->body_pos = 0;
c->method = 0;
c->part = PART_REQUEST;
c->attr = 0;
c->content_length = 0;
return;
}
/* copy body content (as far as available) */
c->body_pos = (c->content_length < c->line_pos) ? c->content_length : c->line_pos;
if (c->body_pos) {
memcpy(c->body, c->line_buf, c->body_pos);
c->line_pos -= c->body_pos; /* NOTE: we are NOT moving the buffer since non-zero left-over causes connection close */
}
/* POST will continue into the BODY part */
break;
}
{
char *bol = s;
while (*s && *s != '\r' && *s != '\n') s++;
if (!*s) { /* incomplete line */
if (bol == c->line_buf) {
if (c->line_pos < LINE_BUF_SIZE) /* one, incomplete line, but the buffer is not full yet, just return */
return;
/* the buffer is full yet the line is incomplete - we're in trouble */
send_http_response(c, " 413 Request entity too large\r\nConnection: close\r\n\r\n");
remove_worker(c);
return;
}
/* move the line to the begining of the buffer for later requests */
c->line_pos -= bol - c->line_buf;
memmove(c->line_buf, bol, c->line_pos);
return;
} else { /* complete line, great! */
if (*s == '\r') *(s++) = 0;
if (*s == '\n') *(s++) = 0;
DBG(printf("complete line: {%s}\n", bol));
if (c->part == PART_REQUEST) {
/* --- process request line --- */
size_t rll = strlen(bol); /* request line length */
char *url = strchr(bol, ' ');
if (!url || rll < 14 || strncmp(bol + rll - 9, " HTTP/1.", 8)) { /* each request must have at least 14 characters [GET / HTTP/1.0] and have HTTP/1.x */
send_response(c->sock, "HTTP/1.0 400 Bad Request\r\n\r\n", 28);
remove_worker(c);
return;
}
url++;
if (!strncmp(bol + rll - 3, "1.0", 3)) c->attr |= HTTP_1_0;
if (!strncmp(bol, "GET ", 4)) c->method = METHOD_GET;
if (!strncmp(bol, "POST ", 5)) c->method = METHOD_POST;
if (!strncmp(bol, "HEAD ", 5)) c->method = METHOD_HEAD;
/* only custom handlers can use other methods */
if (!strncmp(url, "/custom/", 8)) {
char *mend = url - 1;
/* we generate a header with the method so it can be passed to the handler */
if (!c->headers)
c->headers = alloc_buffer(1024, NULL);
/* make sure it fits */
if (c->headers->size - c->headers->length >= 18 + (mend - bol)) {
if (!c->method) c->method = METHOD_OTHER;
/* add "Request-Method: xxx" */
memcpy(c->headers->data + c->headers->length, "Request-Method: ", 16);
c->headers->length += 16;
memcpy(c->headers->data + c->headers->length, bol, mend - bol);
c->headers->length += mend - bol;
c->headers->data[c->headers->length++] = '\n';
}
}
if (!c->method) {
send_http_response(c, " 501 Invalid or unimplemented method\r\n\r\n");
remove_worker(c);
return;
}
bol[strlen(bol) - 9] = 0;
c->url = strdup(url);
c->part = PART_HEADER;
DBG(printf("parsed request, method=%d, URL='%s'\n", (int)c->method, c->url));
} else if (c->part == PART_HEADER) {
/* --- process headers --- */
char *k = bol;
if (!c->headers)
c->headers = alloc_buffer(1024, NULL);
if (c->headers) { /* record the header line in the buffer */
size_t l = strlen(bol);
if (l) { /* this should be really always true */
if (c->headers->length + l + 1 > c->headers->size) { /* not enough space? */
size_t fits = c->headers->size - c->headers->length;
if (fits) memcpy(c->headers->data + c->headers->length, bol, fits);
if (alloc_buffer(2048, c->headers)) {
c->headers = c->headers->next;
memcpy(c->headers->data, bol + fits, l - fits);
c->headers->length = l - fits;
c->headers->data[c->headers->length++] = '\n';
}
} else {
memcpy(c->headers->data + c->headers->length, bol, l);
c->headers->length += l;
c->headers->data[c->headers->length++] = '\n';
}
}
}
while (*k && *k != ':') {
if (*k >= 'A' && *k <= 'Z')
*k |= 0x20;
k++;
}
if (*k == ':') {
*(k++) = 0;
while (*k == ' ' || *k == '\t') k++;
DBG(printf("header '%s' => '%s'\n", bol, k));
if (!strcmp(bol, "content-length")) {
c->attr |= CONTENT_LENGTH;
c->content_length = atol(k);
}
if (!strcmp(bol, "content-type")) {
char *l = k;
while (*l) { if (*l >= 'A' && *l <= 'Z') *l |= 0x20; l++; }
c->attr |= CONTENT_TYPE;
if (c->content_type) free(c->content_type);
c->content_type = strdup(k);
if (!strncmp(k, "application/x-www-form-urlencoded", 33))
c->attr |= CONTENT_FORM_UENC;
}
if (!strcmp(bol, "host"))
c->attr |= HOST_HEADER;
if (!strcmp(bol, "connection")) {
char *l = k;
while (*l) { if (*l >= 'A' && *l <= 'Z') *l |= 0x20; l++; }
if (!strncmp(k, "close", 5))
c->attr |= CONNECTION_CLOSE;
}
}
}
}
}
}
if (c->part < PART_BODY) {
/* we end here if we processed a buffer of exactly one line */
c->line_pos = 0;
return;
}
}
if (c->part == PART_BODY && c->body) { /* BODY - this branch always returns */
if (c->body_pos < c->content_length) { /* need to receive more ? */
DBG(printf("BODY: body_pos=%d, content_length=%ld\n", c->body_pos, c->content_length));
ssize_t n = recv(c->sock, c->body + c->body_pos,
c->content_length - c->body_pos, 0);
DBG(printf(" [recv n=%d - had %u of %lu]\n", n, c->body_pos, c->content_length));
c->line_pos = 0;
if (n < 0) { /* error, scrap this worker */
remove_worker(c);
return;
}
if (n == 0) { /* connection closed -> try to process and then remove */
process_request(c);
remove_worker(c);
return;
}
c->body_pos += n;
}
if (c->body_pos == c->content_length) { /* yay! we got the whole body */
process_request(c);
if (c->attr & CONNECTION_CLOSE || c->line_pos) { /* we have to close the connection if there was a double-hit */
remove_worker(c);
return;
}
/* keep-alive - reset the worker so it can process a new request */
if (c->url) { free(c->url); c->url = NULL; }
if (c->body) { free(c->body); c->body = NULL; }
if (c->content_type) { free(c->content_type); c->content_type = NULL; }
if (c->headers) { free_buffer(c->headers); c->headers = NULL; }
c->line_pos = 0; c->body_pos = 0;
c->method = 0;
c->part = PART_REQUEST;
c->attr = 0;
c->content_length = 0;
return;
}
}
/* we enter here only if recv was used to leave the headers with no body */
if (c->part == PART_BODY && !c->body) {
char *s = c->line_buf;
if (c->line_pos > 0) {
if ((s[0] != '\r' || s[1] != '\n') && (s[0] != '\n')) {
send_http_response(c, " 411 length is required for non-empty body\r\nConnection: close\r\n\r\n");
remove_worker(c);
return;
}
/* empty body, good */
process_request(c);
if (c->attr & CONNECTION_CLOSE) {
remove_worker(c);
return;
} else { /* keep-alive */
int sh = 1;
if (s[0] == '\r') sh++;
if (c->line_pos <= sh)
c->line_pos = 0;
else { /* shift the remaining buffer */
memmove(c->line_buf, c->line_buf + sh, c->line_pos - sh);
c->line_pos -= sh;
}
/* keep-alive - reset the worker so it can process a new request */
if (c->url) { free(c->url); c->url = NULL; }
if (c->body) { free(c->body); c->body = NULL; }
if (c->content_type) { free(c->content_type); c->content_type = NULL; }
if (c->headers) { free_buffer(c->headers); c->headers = NULL; }
c->body_pos = 0;
c->method = 0;
c->part = PART_REQUEST;
c->attr = 0;
c->content_length = 0;
return;
}
}
ssize_t n = recv(c->sock, c->line_buf + c->line_pos,
LINE_BUF_SIZE - c->line_pos - 1, 0);
if (n < 0) { /* error, scrap this worker */
remove_worker(c);
return;
}
if (n == 0) { /* connection closed -> try to process and then remove */
process_request(c);
remove_worker(c);
return;
}
if ((s[0] != '\r' || s[1] != '\n') && (s[0] != '\n')) {
send_http_response(c, " 411 length is required for non-empty body\r\nConnection: close\r\n\r\n");
remove_worker(c);
return;
}
}
}
static void srv_input_handler(void *data);
static SOCKET srv_sock = INVALID_SOCKET;
#ifdef _WIN32
/* Windows implementation uses threads to accept and serve
connections, using the main event loop to synchronize with R
through a message-only window which is created on the R thread
*/
static LRESULT CALLBACK RhttpdWindowProc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam)
{
DBG(Rprintf("RhttpdWindowProc(%x, %x, %x, %x)\n", (int) hwnd, (int) uMsg, (int) wParam, (int) lParam));
if (hwnd == message_window && uMsg == WM_RHTTP_CALLBACK) {
httpd_conn_t *c = (httpd_conn_t*) lParam;
process_request_main_thread(c);
return 0;
}
return DefWindowProc(hwnd, uMsg, wParam, lParam);
}
/* server thread - accepts connections on the server socket and
creates worker threads
*/
static DWORD WINAPI ServerThreadProc(LPVOID lpParameter) {
while (srv_sock != INVALID_SOCKET) {
srv_input_handler(lpParameter);
}
return 0;
}
/* worker thread - processes one client connection socket */
static DWORD WINAPI WorkerThreadProc(LPVOID lpParameter) {
httpd_conn_t *c = (httpd_conn_t*) lpParameter;
if (!c) return 0;
while ((c->attr & THREAD_DISPOSE) == 0) {
c->attr |= THREAD_OWNED; /* make sure the worker is not removed by the handler since we need it */
worker_input_handler(c);
}
/* the handler signalled a desire to remove the worker, do it */
c->attr = 0; /* reset the flags */
remove_worker(c); /* free the worker */
return 0;
}
/* global server thread - currently we support only one server at a time */
HANDLE server_thread;
#else
/* on unix we register all used sockets (server and workers) as input
* handlers such that we can avoid polling */
/* global input handler for the server socket */
static InputHandler *srv_handler;
#endif
static void srv_input_handler(void *data)
{
httpd_conn_t *c;
SAIN peer_sa;
socklen_t peer_sal = sizeof(peer_sa);
SOCKET cl_sock = accept(srv_sock, (SA*) &peer_sa, &peer_sal);
if (cl_sock == INVALID_SOCKET) /* accept failed, don't bother */
return;
c = (httpd_conn_t*) calloc(1, sizeof(httpd_conn_t));
c->sock = cl_sock;
c->peer = peer_sa.sin_addr;
#ifndef _WIN32
c->ih = addInputHandler(R_InputHandlers, cl_sock, &worker_input_handler,
HttpdWorkerActivity);
if (c->ih) c->ih->userData = c;
add_worker(c);
#else
if (!add_worker(c)) { /* create worker thread only if the worker
* was accepted */
if (!(c->thread = CreateThread(NULL, 0, WorkerThreadProc,
(LPVOID) c, 0, 0)))
remove_worker(c);
}
#endif
}
int in_R_HTTPDCreate(const char *ip, int port)
{
#ifndef _WIN32
int reuse = 1;
#endif
SAIN srv_sa;
if (needs_init) /* initialization may need to be performed on first use */
first_init();
/* is already in use, close the current socket */
if (srv_sock != INVALID_SOCKET)
closesocket(srv_sock);
#ifdef _WIN32
/* on Windows stop the server thread if it exists */
if (server_thread) {
DWORD ts = 0;
if (GetExitCodeThread(server_thread, &ts) && ts == STILL_ACTIVE)
TerminateThread(server_thread, 0);
server_thread = 0;
}
#endif
/* create a new socket */
srv_sock = socket(AF_INET, SOCK_STREAM, 0);
if (srv_sock == INVALID_SOCKET)
Rf_error("unable to create socket");
#ifndef _WIN32
/* set socket for reuse so we can re-init if we die */
/* But on Windows, this lets us stomp on any port already in use, so don't do it. */
setsockopt(srv_sock, SOL_SOCKET, SO_REUSEADDR,
(const char*)&reuse, sizeof(reuse));
#endif
/* bind to the desired port */
if (bind(srv_sock, build_sin(&srv_sa, ip, port), sizeof(srv_sa))) {
if (sockerrno == EADDRINUSE) {
closesocket(srv_sock);
srv_sock = INVALID_SOCKET;
return -2;
} else {
closesocket(srv_sock);
srv_sock = INVALID_SOCKET;
Rf_error("unable to bind socket to TCP port %d", port);
}
}
/* setup listen */
if (listen(srv_sock, 8))
Rf_error("cannot listen to TCP port %d", port);
#ifndef _WIN32
/* all went well, register the socket as a handler */
if (srv_handler) removeInputHandler(&R_InputHandlers, srv_handler);
srv_handler = addInputHandler(R_InputHandlers, srv_sock,
&srv_input_handler, HttpdServerActivity);
#else
/* do the desired Windows synchronization */
server_thread = CreateThread(NULL, 0, ServerThreadProc, 0, 0, 0);
#endif
return 0;
}
void in_R_HTTPDStop(void)
{
if (srv_sock != INVALID_SOCKET) closesocket(srv_sock);
srv_sock = INVALID_SOCKET;
#ifdef _WIN32
/* on Windows stop the server thread if it exists */
if (server_thread) {
DWORD ts = 0;
if (GetExitCodeThread(server_thread, &ts) && ts == STILL_ACTIVE)
TerminateThread(server_thread, 0);
server_thread = 0;
}
#else
if (srv_handler) removeInputHandler(&R_InputHandlers, srv_handler);
#endif
}
/* Create an internal http server in R. Note that currently there can
only be at most one http server running at any given time so the
behavior is undefined if a server already exists (currently any
previous servers will be shut down by this call but the shutdown
may not be clean).
@param sIP is the IP to bind to (or NULL for any)
@param sPort is the TCP port number to bin to
@return returns an integer value -- 0L on success, other values
denote failures: -2L means that the address/port combination is
already in use
*/
SEXP R_init_httpd(SEXP sIP, SEXP sPort)
{
const char *ip = 0;
if (sIP != R_NilValue && (TYPEOF(sIP) != STRSXP || LENGTH(sIP) != 1))
Rf_error("invalid bind address specification");
if (sIP != R_NilValue)
ip = CHAR(STRING_ELT(sIP, 0));
return ScalarInteger(in_R_HTTPDCreate(ip, asInteger(sPort)));
}