From 08ec53485bb6e0ecca77489ebb24110740496ff3 Mon Sep 17 00:00:00 2001
From: Ronan LE MEILLAT <rtkgps@outlook.com>
Date: Sat, 4 Aug 2018 08:38:00 +0200
Subject: [PATCH] Add support for unix socket stream #15

---
 src/rtklib.h |   1 +
 src/stream.c | 234 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 234 insertions(+), 1 deletion(-)

diff --git a/src/rtklib.h b/src/rtklib.h
index c7f52ce..43d8a73 100644
--- a/src/rtklib.h
+++ b/src/rtklib.h
@@ -432,6 +432,7 @@ extern "C" {
 #define STR_UDPSVR   12                 /* stream type: UDP server */
 #define STR_UDPCLI   13                 /* stream type: UDP server */
 #define STR_MEMBUF   14                 /* stream type: memory buffer */
+#define STR_UNIXSVR  15                 /* stream type: unix domain socket server */
 
 #define STRFMT_RTCM2 0                  /* stream format: RTCM 2 */
 #define STRFMT_RTCM3 1                  /* stream format: RTCM 3 */
diff --git a/src/stream.c b/src/stream.c
index d84f782..816d3c5 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -73,6 +73,7 @@
 #include <errno.h>
 #include <termios.h>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <arpa/inet.h>
@@ -120,6 +121,10 @@ typedef int socklen_t;
 #define closesocket         close
 #endif
 
+#ifndef SUN_LEN
+#define SUN_LEN(ptr) ((size_t) (((struct sockaddr_un *) 0)->sun_path) + strlen((ptr)->sun_path))
+#endif
+
 /* type definition -----------------------------------------------------------*/
 
 typedef struct {            /* file control type */
@@ -238,6 +243,22 @@ typedef struct {            /* ftp download control type */
     thread_t thread;        /* download thread */
 } ftp_t;
 
+typedef struct {            /* unix domain socket control type */
+    int state;              /* state (0:close,1:wait,2:connect) */
+    char saddr[108];        /* address string */
+    int port;               /* port */
+    struct sockaddr_un addr; /* address resolved */
+    socket_t sock;          /* socket descriptor */
+    int tcon;               /* reconnect time (ms) (-1:never,0:now) */
+    unsigned int tact;      /* data active tick */
+    unsigned int tdis;      /* disconnect tick */
+} unix_t;
+
+typedef struct {            /* unix domain socket server type */
+    unix_t svr;              /* unix server control */
+    unix_t cli[MAXCLI];      /* unix client controls */
+} unixsvr_t;
+
 typedef struct {            /* memory buffer type */
     int state,wp,rp;        /* state,write/read pointer */
     int bufsize;            /* buffer size (bytes) */
@@ -2515,6 +2536,212 @@ static int statexftp(ftp_t *ftp, char *msg)
 {
     return !ftp?0:(ftp->state==0?2:(ftp->state<=2?3:-1));
 }
+
+/* generate unix socket -------------------------------------------------------*/
+static int genunix(unix_t *unx, char *msg)
+{
+    tracet(3,"genunix\n");
+
+    /* generate socket */
+    if ((unx->sock=socket(AF_LOCAL,SOCK_STREAM,0))==(socket_t)-1) {
+        sprintf(msg,"socket error (%d)",errsock());
+        tracet(1,"genunix: socket error err=%d\n",errsock());
+        unx->state=-1;
+        return 0;
+    }
+    if (!setsock(unx->sock,msg)) {
+        unx->state=-1;
+        return 0;
+    }
+    memset(&unx->addr,0,sizeof(unx->addr));
+    unx->addr.sun_family=AF_LOCAL;
+    strncpy(unx->addr.sun_path, unx->saddr, sizeof(unx->addr.sun_path)-1);
+
+    unlink(unx->addr.sun_path);
+
+    if (bind(unx->sock,(struct sockaddr *)&unx->addr,SUN_LEN(&unx->addr))==-1) {
+        sprintf(msg,"bind error (%d) ",errsock());
+        tracet(1,"genunix: bind error path=%s err=%d\n",unx->addr.sun_path,errsock());
+        closesocket(unx->sock);
+        unx->state=-1;
+        return 0;
+    }
+    listen(unx->sock,5);
+    unx->state=1;
+    unx->tact=tickget();
+    tracet(5,"genunx: exit sock=%d\n",unx->sock);
+    return 1;
+}
+/* disconnect unix socket ---------------------------------------------------*/
+static void disconunix(unix_t *unx, int tcon)
+{
+    tracet(3,"disconunix: sock=%d tcon=%d\n",unx->sock,tcon);
+
+    closesocket(unx->sock);
+    unx->state=0;
+    unx->tcon=tcon;
+    unx->tdis=tickget();
+}
+/* open unix domain socket server --------------------------------------------*/
+static unixsvr_t *openunixsvr(const char *path, char *msg)
+{
+    unixsvr_t *unixsvr,unixsvr0={{0}};
+
+    tracet(3,"openunixsvr: path=%s\n",path);
+
+    if (!(unixsvr=(unixsvr_t *)malloc(sizeof(unixsvr_t)))) return NULL;
+    *unixsvr=unixsvr0;
+    strncpy(unixsvr->svr.saddr, path, sizeof(unixsvr->svr.saddr)-1);
+    if (!genunix(&unixsvr->svr,msg)) {
+        free(unixsvr);
+        return NULL;
+    }
+    unixsvr->svr.tcon=0;
+    return unixsvr;
+}
+/* close unix server ----------------------------------------------------------*/
+static void closeunixsvr(unixsvr_t *unixsvr)
+{
+    int i;
+
+    tracet(3,"closeunixsvr:\n");
+
+    for (i=0;i<MAXCLI;i++) {
+        if (unixsvr->cli[i].state) closesocket(unixsvr->cli[i].sock);
+    }
+    closesocket(unixsvr->svr.sock);
+    unlink(unixsvr->svr.addr.sun_path);
+    free(unixsvr);
+}
+/* update unix server ---------------------------------------------------------*/
+static void updateunixsvr(unixsvr_t *unixsvr, char *msg)
+{
+    char saddr[256]="";
+    int i,j,n=0;
+
+    tracet(3,"updateunixsvr: state=%d\n",unixsvr->svr.state);
+
+    if (unixsvr->svr.state==0) return;
+
+    for (i=0;i<MAXCLI;i++) {
+        if (unixsvr->cli[i].state) continue;
+        for (j=i+1;j<MAXCLI;j++) {
+            if (!unixsvr->cli[j].state) continue;
+            unixsvr->cli[i]=unixsvr->cli[j];
+            unixsvr->cli[j].state=0;
+            break;
+        }
+    }
+    for (i=0;i<MAXCLI;i++) {
+        if (!unixsvr->cli[i].state) continue;
+        memcpy(saddr,unixsvr->cli[i].saddr, sizeof(unixsvr->cli[i].saddr));
+        n++;
+    }
+    if (n==0) {
+        unixsvr->svr.state=1;
+        sprintf(msg,"waiting...");
+        return;
+    }
+    unixsvr->svr.state=2;
+    if (n==1) sprintf(msg,"%s",saddr); else sprintf(msg,"%d clients",n);
+}
+/* accept client connection --------------------------------------------------*/
+static int accunixsock(unixsvr_t *unixsvr, char *msg)
+{
+    struct sockaddr_un addr;
+    socket_t sock;
+    socklen_t len=sizeof(addr);
+    int i,err;
+
+    tracet(3,"accunixsock: sock=%d\n",unixsvr->svr.sock);
+
+    for (i=0;i<MAXCLI;i++) if (unixsvr->cli[i].state==0) break;
+    if (i>=MAXCLI) return 0; /* too many client */
+
+    if ((sock=accept_nb(unixsvr->svr.sock,(struct sockaddr *)&addr,&len))==(socket_t)-1) {
+        err=errsock();
+        sprintf(msg,"accept error (%d)",err);
+        tracet(1,"accunixsock: accept error sock=%d err=%d\n",unixsvr->svr.sock,err);
+        closesocket(unixsvr->svr.sock); unixsvr->svr.state=0;
+        unlink(unixsvr->svr.addr.sun_path);
+        return 0;
+    }
+    if (sock==0) return 0;
+
+    unixsvr->cli[i].sock=sock;
+    if (!setsock(unixsvr->cli[i].sock,msg)) return 0;
+    memcpy(&unixsvr->cli[i].addr,&addr,sizeof(addr));
+    memcpy(unixsvr->cli[i].saddr,addr.sun_path, sizeof(addr.sun_path));
+    sprintf(msg,"%s",unixsvr->cli[i].saddr);
+    tracet(2,"accunixsock: connected sock=%d addr=%s\n",unixsvr->cli[i].sock,unixsvr->cli[i].saddr);
+    unixsvr->cli[i].state=2;
+    unixsvr->cli[i].tact=tickget();
+    return 1;
+}
+/* wait socket accept --------------------------------------------------------*/
+static int waitunixsvr(unixsvr_t *unixsvr, char *msg)
+{
+    tracet(4,"waitunixsvr: sock=%d state=%d\n",unixsvr->svr.sock,unixsvr->svr.state);
+
+    if (unixsvr->svr.state<=0) return 0;
+
+    while (accunixsock(unixsvr,msg)) ;
+
+    updateunixsvr(unixsvr,msg);
+    return unixsvr->svr.state==2;
+}
+/* read unix server ----------------------------------------------------------*/
+static int readunixsvr(unixsvr_t *unixsvr, unsigned char *buff, int n, char *msg)
+{
+    int nr,err;
+
+    tracet(4,"readunixsvr: state=%d n=%d\n",unixsvr->svr.state,n);
+
+    if (!waitunixsvr(unixsvr,msg)||unixsvr->cli[0].state!=2) return 0;
+
+    if ((nr=recv_nb(unixsvr->cli[0].sock,buff,n))==-1) {
+        err=errsock();
+        tracet(1,"readunixsvr: recv error sock=%d err=%d\n",unixsvr->cli[0].sock,err);
+        sprintf(msg,"recv error (%d)",err);
+        disconunix(&unixsvr->cli[0],ticonnect);
+        updateunixsvr(unixsvr,msg);
+        return 0;
+    }
+    if (nr>0) unixsvr->cli[0].tact=tickget();
+    tracet(5,"readunixsvr: exit sock=%d nr=%d\n",unixsvr->cli[0].sock,nr);
+    return nr;
+}
+/* write unix server ---------------------------------------------------------*/
+static int writeunixsvr(unixsvr_t *unixsvr, unsigned char *buff, int n, char *msg)
+{
+    int i,ns=0,err;
+
+    tracet(3,"writeunixsvr: state=%d n=%d\n",unixsvr->svr.state,n);
+
+    if (!waitunixsvr(unixsvr,msg)) return 0;
+
+    for (i=0;i<MAXCLI;i++) {
+        if (unixsvr->cli[i].state!=2) continue;
+
+        if ((ns=send_nb(unixsvr->cli[i].sock,buff,n))==-1) {
+            err=errsock();
+            tracet(1,"writeunixsvr: send error i=%d sock=%d err=%d\n",i,unixsvr->cli[i].sock,err);
+            sprintf(msg,"send error (%d)",err);
+            disconunix(&unixsvr->cli[i],ticonnect);
+            updateunixsvr(unixsvr,msg);
+            return 0;
+        }
+        if (ns>0) unixsvr->cli[i].tact=tickget();
+        tracet(5,"writeunixsvr: send i=%d ns=%d\n",i,ns);
+    }
+    return ns;
+}
+/* get state unix server -----------------------------------------------------*/
+static int stateunixsvr(unixsvr_t *unixsvr)
+{
+    return unixsvr?unixsvr->svr.state:0;
+}
+
 /* open memory buffer --------------------------------------------------------*/
 static membuf_t *openmembuf(const char *path, char *msg)
 {
@@ -2757,7 +2984,7 @@ extern void strinit(stream_t *stream)
 *                    tint  = download interval (s)
 *                    toff  = download time offset (s)
 *                    tret  = download retry interval (s) (0:no retry)
-*
+*   STR_UNIXSVR  path
 *-----------------------------------------------------------------------------*/
 extern int stropen(stream_t *stream, int type, int mode, const char *path)
 {
@@ -2785,6 +3012,7 @@ extern int stropen(stream_t *stream, int type, int mode, const char *path)
         case STR_MEMBUF  : stream->port=openmembuf(path,     stream->msg); break;
         case STR_FTP     : stream->port=openftp   (path,0,   stream->msg); break;
         case STR_HTTP    : stream->port=openftp   (path,1,   stream->msg); break;
+        case STR_UNIXSVR : stream->port=openunixsvr(path,    stream->msg); break;
         default: stream->state=0; return 1;
     }
     stream->state=!stream->port?-1:1;
@@ -2816,6 +3044,7 @@ extern void strclose(stream_t *stream)
             case STR_MEMBUF  : closemembuf((membuf_t *)stream->port); break;
             case STR_FTP     : closeftp   ((ftp_t    *)stream->port); break;
             case STR_HTTP    : closeftp   ((ftp_t    *)stream->port); break;
+            case STR_UNIXSVR : closeunixsvr   ((unixsvr_t    *)stream->port); break;
         }
     }
     else {
@@ -2887,6 +3116,7 @@ extern int strread(stream_t *stream, unsigned char *buff, int n)
         case STR_MEMBUF  : nr=readmembuf((membuf_t *)stream->port,buff,n,msg); break;
         case STR_FTP     : nr=readftp   ((ftp_t    *)stream->port,buff,n,msg); break;
         case STR_HTTP    : nr=readftp   ((ftp_t    *)stream->port,buff,n,msg); break;
+        case STR_UNIXSVR : nr=readunixsvr((unixsvr_t    *)stream->port,buff,n,msg); break;
         default:
             strunlock(stream);
             return 0;
@@ -2928,6 +3158,7 @@ extern int strwrite(stream_t *stream, unsigned char *buff, int n)
         case STR_TCPCLI  : ns=writetcpcli((tcpcli_t *)stream->port,buff,n,msg); break;
         case STR_NTRIPSVR:
         case STR_NTRIPCLI: ns=writentrip ((ntrip_t  *)stream->port,buff,n,msg); break;
+        case STR_UNIXSVR : ns=writeunixsvr((unixsvr_t *)stream->port,buff,n,msg); break;
         case STR_NTRIPC_S:
         case STR_NTRIPC_C: ns=writentripc((ntripc_t *)stream->port,buff,n,msg); break;
         case STR_UDPCLI  : ns=writeudpcli((udp_t    *)stream->port,buff,n,msg); break;
@@ -3022,6 +3253,7 @@ extern int strstat(stream_t *stream, char *msg)
         case STR_MEMBUF  : state=statemembuf((membuf_t *)stream->port); break;
         case STR_FTP     : state=stateftp   ((ftp_t    *)stream->port); break;
         case STR_HTTP    : state=stateftp   ((ftp_t    *)stream->port); break;
+        case STR_UNIXSVR : state=stateunixsvr((unixsvr_t    *)stream->port); break;
         default:
             strunlock(stream);
             return 0;
-- 
GitLab