2

Here's the situation: I'm making a proxy (for the moment running on my laptop) which deals with HTTP(S) requests from incoming clients (for the moment only one client, i.e. my Chrome browser on my laptop). It works without troubles, but it is a best effort proxy, i.e. it does not give priority to some types of traffic (streaming over SMTP, for example).

I thought of three classes of priority (minimum:1, medium:2 and maximum:3) and since my traffic has two directions (LAN -> proxy -> WAN and WAN -> proxy -> LAN), I thought about six queues of priority like these (the FIFO nature of queue won't cause starvation of packets):

// LAN -> proxy -> WAN, from max to min priority
queue queue_to_remote_3;
queue queue_to_remote_2;
queue queue_to_remote_1;
// WAN -> proxy -> LAN, from max to min priority
queue queue_to_local_3;
queue queue_to_local_2;
queue queue_to_local_1;

This is the pseudocode (C-like) of the algorithm I made dealing with incoming packets from both LAN and WAN: I'll post just the part dealing LAN -> proxy -> WAN, since WAN -> proxy -> LAN has the same reasoning, but instead of queue_to_remote_X it uses queue_to_local_X:

void algoQueues(packet pkt, int direction, int priority) {
    if (direction == TO_REMOTE) {
        switch (priority) {
            case 1: { // MAXIMUM priority
                // if queue1 is empty, just let it pass
                if (queue_to_remote_1.empty()) {
                    letPass(pkt);
                }
                // if queue1 is not empty, push the arrived pkt and
                // pop another pkt from the queue to let it pass
                else {
                    queue_to_remote1.push(pkt); 
                    packet new_pkt = queue_to_remote1.pop();
                    letPass(new_pkt);
                }
                break;
            }
            case 2: { // MEDIUM priority
                // first check if queue1 has pkts to send
                if (queue_to_remote_1.empty()) {
                    // if queue1 is empty, then check queue2
                    if (queue_to_remote_2.empty()) {
                        // if queue2 is empty too, just let the packet pass
                        letPass(pkt);
                    }
                    // if queue1 is empty and queue2 is not empty,
                    // push the new arrived pkt in queue2 and send 
                    // a queued pkt from queue2
                    else {
                        queue_to_remote_2.push(pkt);
                        packet new_pkt = queue_to_remote_2.pop();
                        letPass(new_pkt);
                    }
                }
                // if queue1 is not empty, I must queue the new arrived pkt
                // in queue 2 and send a queued pkt from queue1
                else {
                    queue_to_remote_2.push(pkt);
                    packet new_pkt = queue_to_remote_1.pop();
                    letPass(new_pkt);
                }
                break;
            }
            case 3: { // MINIMUM priority
                // first check if queue1 has pkts to send
                if (queue_to_remote_1.empty()) {
                    // if queue1 is empty, then check queue2
                    if (queue_to_remote_2.empty()) {
                        // if queue3 is empty, just let it pass
                        if (queue_to_remote_3.empty()) {
                            letPass(pkt);
                        }
                        // if queue3 is not empty, push the arrived pkt 
                        // in queue3 and pop another pkt from the 
                        // same queue to let it pass
                        else {
                            queue_to_remote_3.push(pkt);
                            packet new_pkt = queue_to_remote_3.pop();
                            letPass(new_pkt);
                        }
                    }
                    // if queue2 is not empty and queue1 is empty, push the 
                    // arrived pkt in queue3, get a pkt from queue2 and send it
                    else {
                        queue_to_remote_3.push(pkt);
                        packet new_pkt = queue_to_remote_2.pop();
                        letPass(new_pkt);
                    }
                }
                // if queue1 is not empty, queue the pkt in queue3
                // and send a pkt from queue1
                else {
                    queue_to_remote_3.push(pkt);
                    packet new_pkt = queue_to_remote_1.pop();
                    letPass(new_pkt);
                }
                break;
            }
            default: {
                printf("To remote: unknown priority, strange...\n");
                exit(EXIT_FAILURE);
            }
        }
    }
    else if (direction == TO_LOCAL) {
        /* ... */
    }
}

A smart observer might object: "when do you insert the first packet???", and that is my issue; the first time this algorithm is called, all queues are empty, so every packet will just pass!

I can't push the first packet of any priority in its queue and break from switch statement in order to unlock all others ifs, it sounds stupid: is my reasoning (and thus my algorithm) wrong, or am I using the wrong data structure?

elmazzun
  • 271
  • 2
  • 7
  • Do you want QOS proxies in both directions? What is the QOS scenario you want to achieve? – Kasper van den Berg Apr 10 '16 at 14:46
  • Do you want client B's low or medium priority requests to wait for client A's high priority requests? Or should every connection have its own independent queues? – Kasper van den Berg Apr 10 '16 at 14:51
  • My final scenario is to get a working proxy with QOS implemented on my Raspberry Pi 2 acting as Access Point. As for your second comment, I sincerely don't know how to answer: I don't know pros and cons of the alternatives you proposed me about client priority requests. I still did not think about these variables. – elmazzun Apr 10 '16 at 15:00

2 Answers2

0

QOS Algorithm

If possible, you can solve this with multiple threads (or processes):

  1. One thread/process receives requests and puts the request into the queue with the right priority.
  2. Another thread/process blocks waiting for requests arriving in any of the queues.

    When the thread/process wakes, it sends the messages that are in the queues, first checking the highest priority queue, then the next priority, and so on.

    When there are no message left in any queue the thread/process blocks again.

Concurrent Message queue structure

The OS may provide pipes, sockets, files, or messages queues. You can use these existing structures as queues for inter process communication:

The receiver thread(s)/process(es) write the request to the pipe, socket, file or message queue.

The listening thread monitors all queue pipes/sockets/files simultaneously. Linux provides select for this, Windows (Embedded) has probably something similar —perhaps GetQueuedCompletionStatus

When using threads you have shared memory available. If you have atomic 'compare and set' operations you can implement (or using an existing implementation of) a concurrent non blocking queue (here is the java source to use as inspiration).

If you do not have atomic operations but do have locks/semaphores/… you can use a regular queue/linked list/cyclic buffer and use locks to ensure the pointers are updated consistently.

Kasper van den Berg
  • 2,636
  • 1
  • 16
  • 32
  • The thing is, mine is a forking proxy: for every client connecting, the proxy calls `accept()` then it `fork()`s for managing client requests. So every child process should have threads...is a good thing having multiple threads inside a forked process? – elmazzun Apr 10 '16 at 12:59
  • Plus, I created six queues of priority for the two directions of traffic in order to not deal with race conditions, every queue has a single priority and a single direction. – elmazzun Apr 10 '16 at 13:19
  • I splitted the insertion and the removal from the queues: @Kasper van den Berg gave me a hint, when saying I may have one thread/process to push() and another thread/process to pop() from queues. I don't know it this is the most efficient way, but it works keeping a tab with Netflix and another tab with YouTube (respectively with priority maximum and medium). I'll accept your answer because it gave me the hint to separate the pushing from the popping in queues, but I'll answer to my question posting the working pseudocode. Thanks Kasper :) – elmazzun Apr 10 '16 at 16:08
0

Working code (not pseudocode as I said, duh! but this is more explicative): I have two file desciptor, one for client and another for server, both opened with socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); the select() checks if client of server sent something, the fd_set is only for reading (from client and server).

Warning: I'm not sure if I close correctly the file descriptors where I have to or if there are any conceptual mistakes, but this works for me: select(), check for reading for client and server, recv(), push() in queue, pop() from queue and send().

int maxp = server_fd >= client_fd ? server_fd+1 : client_fd+1;
int r;
int priority = 0;
int read_from_client = 0;
int read_from_server = 0;
int send_to_client = 0;
int send_to_server = 0;
struct timeval timeout;
char https_buf[1500];
int https_buf_size = sizeof(https_buf);
fd_set fdset;

while(true) {
    memset(https_buf, 0, https_buf_size);
    FD_ZERO(&fdset);
    FD_SET(client_fd, &fdset);
    FD_SET(server_fd, &fdset);

    timeout.tv_sec = 30;
    timeout.tv_usec = 0;

    r = select(maxp, &fdset, NULL, NULL, &timeout);

    if (r < 0) {
        perror("select()");
        close(client_fd);
        client_fd = -1;
        close(server_fd);
        server_fd = -1;
        break;
    }

    if (r == 0) { // select timed out
        close(client_fd);
        client_fd = -1;
        close(server_fd);
        server_fd = -1;
        break;
    }

    if (FD_ISSET(client_fd, &fdset)) {
        read_from_client = recv(client_fd, https_buf, https_buf_size, 0);
        if (read_from_client < 0) {
            perror("recv():");
            close(client_fd);
            client_fd = -1;
            close(server_fd);
            server_fd = -1;
            break;
        }
        else if (read_from_client == 0) {
            // connection closed
            close(client_fd);
            client_fd = -1;
            close(server_fd);
            server_fd = -1;
            break;
        }
        priority = whatPriorityHasPkt();
        insertInQueue(/* useful data */);
        // specify the direction and the buffer 
        // where to write data
        removeFromQueue(TO_REMOTE, https_buf);
        send_to_server = send(server_fd, https_buf, read_from_client, 0);
        // if send() returns error, close both 
        // sockets and break;
    }

    if (FD_ISSET(server_fd, &fdset)) {
        read_from_server = recv(server_fd, https_buf, https_buf_size, 0);
        if (read_from_server < 0) {
            perror("recv:");
            close(client_fd);
            client_fd = -1;
            close(server_fd);
            server_fd = -1;
            break;
        }
        else if (read_from_server == 0) {
            close(client_fd);
            client_fd = -1;
            close(server_fd);
            server_fd = -1;
            break;
        }
        priority = whatPriorityHasPkt();        
        insertInQueue(/* useful data */);
        removeFromQueue(TO_LOCAL, https_buf);
        send_to_client = send(client_fd, https_buf, read_from_server, 0);       
        // if send() returns error, close both 
        // sockets and break;
    }
}
elmazzun
  • 271
  • 2
  • 7