Skip to content

Commit

Permalink
initial commit, wokring server
Browse files Browse the repository at this point in the history
  • Loading branch information
elazarl committed Oct 21, 2012
0 parents commit eee6f7e
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 0 deletions.
106 changes: 106 additions & 0 deletions 10kthreads.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>

#include <pthread.h>

#include "rusage.h"
#include "atomic.h"

void panic(char *msg) {
perror(msg);
exit(EXIT_FAILURE);
}

typedef unsigned char byte;

atomic_int currConc = ATOMIC_INT(0);
atomic_int maxConc = ATOMIC_INT(0);
atomic_int total = ATOMIC_INT(0);

void *agent(void *_sock) {
int sock = (intptr_t)_sock;
byte buf[100];
int len = read(sock, buf, sizeof(buf));
if (len < 0) {
perror("read");
return NULL;
}
const char *msg = "ECHO: ";
if (write(sock, msg, strlen(msg)<0)) {
perror("write");
return NULL;
}
if (write(sock, buf, len)<0) {
return NULL;
}
if (close(sock)!=0) {
perror("close");
return NULL;
}
atomic_int_add(&currConc, -1);
return NULL;
}

int close_sock;
atomic_int is_exit = ATOMIC_INT(0);

void set_exit(int sig) {
puts("EXITING");
atomic_int_set(&is_exit, 1);
if (close(close_sock)!=0) {
perror("exit sock");
}
}

int main(int argc, char **argv) {
int sock;
int rv;
struct addrinfo *addr;
struct addrinfo *it = NULL;
struct addrinfo hint = {AF_UNSPEC, PF_INET, AI_PASSIVE};

signal(SIGINT, set_exit);
if ((rv = getaddrinfo(NULL, "1234", &hint, &addr))!=0) {
printf("rv=%s\n", gai_strerror(rv));
}
for (it = addr; it!=NULL; it = it->ai_next) {
sock = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
if (sock == -1) continue;

if (bind(sock, it->ai_addr, it->ai_addrlen) == 0) break;

close(sock);
perror("bind");
}
if (it==NULL) {
fprintf(stderr, "could not bind");
return -1;
}
listen(sock, 5);
close_sock = sock;
while (1) {
struct addrinfo client_addr;
socklen_t addr_len = sizeof(client_addr);
pthread_t t;
int conc;
int client_sock = accept(sock, (struct sockaddr *) &client_addr, &addr_len);
if (atomic_int_get(&is_exit)!=0) break;
if (client_sock==-1) panic("accept");
conc = atomic_int_add(&currConc, 1);
atomic_int_add(&total, 1);
while (conc>atomic_int_get(&maxConc) && !atomic_int_compare_and_swap(&maxConc, atomic_int_get(&maxConc), conc));
pthread_create(&t, NULL, agent, (void*)(intptr_t)client_sock);
}
// if we got here, we already close(sock);
freeaddrinfo(addr);
print_rusage();
printf("Max conc: %d\n", atomic_int_get(&maxConc));
printf("Total: %d\n", atomic_int_get(&total));
printf("Now: %d\n", atomic_int_get(&currConc));
return 0;
}
62 changes: 62 additions & 0 deletions 10kthreads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"fmt"
"io/ioutil"
"net"
"os"
"os/signal"
"sync/atomic"
)

func main() {
l, err := net.Listen("tcp", "localhost:1234")
if err != nil {
fmt.Println("listen:", err)
return
}
s := make(chan os.Signal)
closed := int32(0)
signal.Notify(s, os.Interrupt)
go func() {
<-s
atomic.StoreInt32(&closed, 1)
fmt.Println("closing")
l.Close()
}()
totalConc := int32(0)
maxConc := int32(0)
conc := int32(0)
for {
c, err := l.Accept()
if closed != 0 {
fmt.Println("closed")
break
}
if err != nil {
fmt.Println("accept:", err)
return
}
go func(c net.Conn) {
defer c.Close()
atomic.AddInt32(&totalConc, 1)
curr := atomic.AddInt32(&conc, 1)
for curr > maxConc && !atomic.CompareAndSwapInt32(&maxConc, maxConc, curr) {
}
defer atomic.AddInt32(&conc, -1)
b, err := ioutil.ReadAll(c)
if err != nil {
fmt.Println("read:", err)
return
}
if _, err := c.Write(b); err != nil {
fmt.Println("write:", err)
return
}
}(c)
}
fmt.Println("Done")
fmt.Println("Total connections:", totalConc)
fmt.Println("Max concurrent connections:", maxConc)
fmt.Println("Current connections", conc)
}
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CFLAGS=-Wall #-Wextra
10kthreads: 10kthreads.c rusage.o
gcc $(CFLAGS) -pthread $^ -o $@
rusage.o: rusage.c
gcc -c $(CFLAGS) $^ -o $@
5 changes: 5 additions & 0 deletions atomic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#if __GNUC__ >= 4 && __GNUC_MINOR__ >= 1
#include "atomic_gcc.h"
#else
#include "atomic_pthread.h"
#endif
25 changes: 25 additions & 0 deletions atomic_gcc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#ifndef __ATOMIC_GCC_H_
#define __ATOMIC_GCC_H_
typedef struct atomic_int_t {
volatile int v;
} atomic_int;

static inline int atomic_int_set(atomic_int *a, int new_v) {
return __sync_lock_test_and_set(&a->v, new_v);
}

static inline int atomic_int_get(atomic_int *a) {
return a->v;
}

static inline int atomic_int_compare_and_swap(atomic_int *a, int old, int v) {
return __sync_bool_compare_and_swap(&a->v, old, v);
}

static inline int atomic_int_add(atomic_int *a, int v) {
return __sync_add_and_fetch(&a->v, v);
}

#define ATOMIC_INT(x) {x}

#endif
31 changes: 31 additions & 0 deletions atomic_pthread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#ifndef __ATOMIC_PTHREAD_H_
#define __ATOMIC_PTHREAD_H_
typedef struct atomic_int_t {
pthread_mutex_t mutex;
volatile int v;
} atomic_int;

#define ATOMIC(x) {PTHREAD_MUTEX_INITIALIZER, x};

static inline int atomic_int_set(atomic_int *a, int new_v) {
pthread_mutex_lock(&a->mutex);
int old_v = a->v;
a->v = new_v;
pthread_mutex_unlock(&a->mutex);
return old_v;
}

static inline int atomic_int_get(atomic_int *a) {
pthread_mutex_lock(&a->mutex);
int v = a->v;
pthread_mutex_unlock(&a->mutex);
return v;
}

static inline int atomic_int_compare_and_swap(atomic_int *a, int old, int v) {
pthread_mutex_lock(&a->mutex);
int v = a->v;
pthread_mutex_unlock(&a->mutex);
return 1;
}
#endif
22 changes: 22 additions & 0 deletions rusage.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <unistd.h>
#include "rusage.h"

void print_rusage(void) {
struct rusage u;

if (getrusage(RUSAGE_SELF,&u)==0) {
puts("resident ");
printf("%lu", u.ru_maxrss);
puts("\nshared ");
printf("%lu", u.ru_ixrss);
puts("\nunshared data ");
printf("%lu", u.ru_idrss);
puts("\nstack ");
printf("%lu\n", u.ru_isrss);
} else {
perror("getrusage");
}
}
1 change: 1 addition & 0 deletions rusage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
void print_rusage(void);
55 changes: 55 additions & 0 deletions slowreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"sync"
"time"
)

func panicOnErr(msg string, err error) {
if err!=nil {
panic(msg+": "+err.Error())
}
}

func main() {
nthreads := flag.Int("n", -1, "how many threads will open connection concurrently")
data := flag.String("data", "hello", "what will those threads write")
host := flag.String("host", "localhost:1234", "all threads connect to host")
flag.Parse()
if *nthreads<=0 {
fmt.Println("Must supply nthreads")
flag.Usage()
return
}

var step1, step2 sync.WaitGroup
step1.Add(*nthreads)
step2.Add(1)
for i:=0; i<*nthreads; i++ {
go func() {
conn, err := net.Dial("tcp", *host)
if err!=nil {
fmt.Println("dial:", err)
step1.Done()
step2.Wait()
return
}
//panicOnErr("dial", err)
defer conn.Close()
fmt.Fprint(conn, *data)
step1.Done()
step2.Wait()
_, err = io.Copy(ioutil.Discard, conn)
panicOnErr("copy", err)
}()
time.Sleep(time.Microsecond)
}
step1.Wait()
step2.Done()
}

31 changes: 31 additions & 0 deletions sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"fmt"
"sort"
)

type SortBy struct {
Data []string
IsLess func (l, r string) bool
}

func (sb SortBy) Less(i, j int) bool {
return sb.IsLess(sb.Data[i], sb.Data[j])
}

func (sb SortBy) Swap(i, j int) {
sb.Data[i], sb.Data[j] = sb.Data[j], sb.Data[i]
}

func (sb SortBy) Len() int {
return len(sb.Data)
}

func main() {
s := []string{"zzz", "a12345", "b", "cc", "d"};
sort.Sort(SortBy{s, func (l, r string) bool {return l<r}})
fmt.Println(s)
sort.Sort(SortBy{s, func (l, r string) bool {return len(l)<len(r)}})
fmt.Println(s)
}

0 comments on commit eee6f7e

Please sign in to comment.