Skip to content

Commit

Permalink
Applied Shravan's zlib compression patch
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnLangford committed Dec 31, 2009
2 parents cefb32e + 6dc15b3 commit 4ab9d6a
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 98 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
COMPILER = g++
LIBS = -l boost_program_options -l pthread
LIBS = -l boost_program_options -l pthread -l z
BOOST_INCLUDE = /usr/local/boost/include/boost-1_34_1
BOOST_LIBRARY = /usr/local/boost/lib

Expand All @@ -25,9 +25,9 @@ vw.o: parse_example.h parse_regressor.h parse_args.h parser.h

offset_tree.o: parse_example.h parse_regressor.h parse_args.h parser.h

parse_args.o: parse_regressor.h parse_example.h io.h gd.h
parse_args.o: parse_regressor.h parse_example.h io.h comp_io.h gd.h

parse_example.o: io.h parse_example.cc parser.h
parse_example.o: io.h comp_io.h parse_example.cc parser.h

sender.o: parse_example.h

Expand Down Expand Up @@ -57,7 +57,7 @@ vw-train: vw
@echo "TEST: vw training ..."
@rm -f test/train.dat.cache
@./vw -b 17 -l 20 --initial_t 128000 \
--power_t 1 -f test/t_r_temp -c --passes 2 -d test/train.dat
--power_t 1 -f test/t_r_temp -c --passes 2 -d test/train.dat --compressed

vw-test: vw-train
@echo
Expand Down
17 changes: 9 additions & 8 deletions cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,28 @@ int read_cached_features(parser* p, void* ec)
{
example* ae = (example*)ec;
size_t mask = global.mask;
io_buf* input = p->input;

size_t total = p->lp->read_cached_label(ae->ld, p->input);
size_t total = p->lp->read_cached_label(ae->ld, *input);
if (total == 0)
return 0;
if (read_cached_tag(p->input,ae) == 0)
if (read_cached_tag(*input,ae) == 0)
return 0;

char* c;
unsigned char num_indices = 0;
if (buf_read(p->input, c, sizeof(num_indices)) < sizeof(num_indices))
if (buf_read(*input, c, sizeof(num_indices)) < sizeof(num_indices))
return 0;
num_indices = *(unsigned char*)c;
c += sizeof(num_indices);

p->input.set(c);
p->input->set(c);

for (;num_indices > 0; num_indices--)
{
size_t temp;
unsigned char index = 0;
if((temp = buf_read(p->input,c,sizeof(index) + sizeof(size_t))) < sizeof(index) + sizeof(size_t)) {
if((temp = buf_read(*input,c,sizeof(index) + sizeof(size_t))) < sizeof(index) + sizeof(size_t)) {
cerr << "truncated example! " << temp << " " << char_size + sizeof(size_t) << endl;
return 0;
}
Expand All @@ -74,9 +75,9 @@ int read_cached_features(parser* p, void* ec)
v_array<feature>* ours = ae->atomics+index;
size_t storage = *(size_t *)c;
c += sizeof(size_t);
p->input.set(c);
p->input->set(c);
total += storage;
if (buf_read(p->input,c,storage) < storage) {
if (buf_read(*input,c,storage) < storage) {
cerr << "truncated example! wanted: " << storage << " bytes" << endl;
return 0;
}
Expand All @@ -102,7 +103,7 @@ int read_cached_features(parser* p, void* ec)
f.weight_index = f.weight_index & mask;
push(*ours, f);
}
p->input.set(c);
p->input->set(c);
}

return total;
Expand Down
96 changes: 96 additions & 0 deletions comp_io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2007 Yahoo! Inc. All rights reserved. The copyrights
* embodied in the content of this file are licensed under the BSD
* (revised) open source license
* */

#ifndef COMP_IO_BUF_H_
#define COMP_IO_BUF_H_

#include "io.h"
#include "v_array.h"
#include "zlib.h"

class comp_io_buf : public io_buf
{
public:
v_array<gzFile> gz_files;

comp_io_buf()
{
init();
}

virtual int open_file(const char* name, int flag=READ){
gzFile fil;
int ret = -1;
switch(flag){
case READ:
fil = gzopen(name, "rb");
if(fil!=NULL){
push(gz_files,fil);
ret = gz_files.index()-1;
push(files,ret);
}
else
ret = -1;
break;

case WRITE:
fil = gzopen(name, "wb");
if(fil!=NULL){
push(gz_files,fil);
ret = gz_files.index()-1;
push(files,ret);
}
else
ret = -1;
break;

default:
cerr << "Unknown file operation. Something other than READ/WRITE specified" << endl;
ret = -1;
}
return ret;
}

virtual void reset_file(int f){
gzFile fil = gz_files[f];
gzseek(fil, 0, SEEK_SET);
endloaded = space.begin;
space.end = space.begin;
}

virtual ssize_t read_file(int f, void* buf, size_t nbytes)
{
gzFile fil = gz_files[f];
int num_read = gzread(fil, buf, nbytes);
return (num_read > 0) ? num_read : 0;
}

virtual inline ssize_t write_file(int f, const void* buf, size_t nbytes)
{
gzFile fil = gz_files[f];
int num_written = gzwrite(fil, buf, nbytes);
return (num_written > 0) ? num_written : 0;
}

virtual void flush()
{
if (write_file(files[0], space.begin, space.index()) != (int) ((space.index())))
cerr << "error, failed to write to cache\n";
space.end = space.begin;
}

virtual bool close_file(){
gzFile fil;
if(files.index()>0){
fil = gz_files[files.pop()];
gzclose(fil);
return true;
}
return false;
}
};

#endif /* COMP_IO_BUF_H_ */
1 change: 0 additions & 1 deletion io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ embodied in the content of this file are licensed under the BSD
*/

#include "io.h"
#include "string.h"

unsigned int buf_read(io_buf &i, char* &pointer, int n)
{//return a pointer to the next n bytes. n must be smaller than the maximum size.
Expand Down
80 changes: 75 additions & 5 deletions io.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ embodied in the content of this file are licensed under the BSD
#ifndef IO_H__
#define IO_H__

#include <fcntl.h>
#include "v_array.h"
#include<iostream>

#ifndef O_LARGEFILE //for OSX
#define O_LARGEFILE 0
#endif

using namespace std;

class io_buf {
Expand All @@ -21,21 +27,68 @@ class io_buf {
v_array<char> currentname;
v_array<char> finalname;

io_buf() {
static const int READ = 1;
static const int WRITE = 2;

void init(){
size_t s = 1 << 16;
reserve(space, s);
reserve(space, s);
current = 0;
count = 0;
endloaded = space.begin;
}

virtual int open_file(const char* name, int flag=READ){
int ret;
switch(flag){
case READ:
ret = open(name, O_RDONLY|O_LARGEFILE);
if(ret!=-1)
push(files,ret);
break;

case WRITE:
ret = open(name, O_CREAT|O_WRONLY|O_LARGEFILE|O_TRUNC,0666);
if(ret!=-1)
push(files,ret);
break;

default:
cerr << "Unknown file operation. Something other than READ/WRITE specified" << endl;
ret = -1;
}
return ret;
}

virtual void reset_file(int f){
lseek(f, 0, SEEK_SET);
endloaded = space.begin;
space.end = space.begin;
}

io_buf() {
init();
}

virtual ~io_buf(){
free(files.begin);
free(space.begin);
}

void set(char *p){space.end = p;}

virtual ssize_t read_file(int f, void* buf, size_t nbytes){
return read(f, buf, nbytes);
}

size_t fill(int f) {
if (space.end_array - endloaded == 0)
{
size_t offset = endloaded - space.begin;
reserve(space, 2 * (space.end_array - space.begin));
endloaded = space.begin+offset;
}
ssize_t num_read = read(f, endloaded, space.end_array - endloaded);
ssize_t num_read = read_file(f, endloaded, space.end_array - endloaded);
if (num_read >= 0)
{
endloaded = endloaded+num_read;
Expand All @@ -44,10 +97,27 @@ class io_buf {
else
return 0;
}
void flush() {
if (write(files[0], space.begin, space.index()) != (int) space.index())

virtual ssize_t write_file(int f, const void* buf, size_t nbytes){
return write(f, buf, nbytes);
}

virtual void flush() {
if (write_file(files[0], space.begin, space.index()) != (int) space.index())
cerr << "error, failed to write example\n";
space.end = space.begin; fsync(files[0]); }

virtual bool close_file(){
if(files.index()>0){
close(files.pop());
return true;
}
return false;
}

void close_files(){
while(close_file());
}
};

void buf_write(io_buf &o, char* &pointer, int n);
Expand Down
2 changes: 1 addition & 1 deletion main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ int main(int argc, char *argv[]) {
cerr << endl;
}

free(vars);
delete vars;

return 0;
}
19 changes: 10 additions & 9 deletions multisource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,34 @@ void reset(partial_example &ex)
int receive_features(parser* p, void* ex)
{
example* ae = (example*)ex;
io_buf* input = p->input;
fd_set fds;
FD_ZERO(&fds);
for (int* sock= p->input.files.begin; sock != p->input.files.end; sock++)
for (int* sock= input->files.begin; sock != input->files.end; sock++)
FD_SET(*sock,&fds);

while (p->input.files.index() > 0)
while (input->files.index() > 0)
{
if (select(p->max_fd,&fds,NULL, NULL, NULL) == -1)
{
cerr << "Select failed!" << endl;
perror(NULL);
exit (1);
}
for (int index = 0; index < (int)p->input.files.index(); index++)
for (int index = 0; index < (int)input->files.index(); index++)
{
int sock = p->input.files[index];
int sock = input->files[index];
if (FD_ISSET(sock, &fds))
{//there is a feature or label to read
prediction pre;
if (!blocking_get_prediction(sock, pre) )
{
FD_CLR(sock, &fds);
close(sock);
memmove(p->input.files.begin+index,
p->input.files.begin+index+1,
(p->input.files.index() - index-1)*sizeof(int));
p->input.files.pop();
memmove(input->files.begin+index,
input->files.begin+index+1,
(input->files.index() - index-1)*sizeof(int));
input->files.pop();
memmove(p->ids.begin+index,
p->ids.begin+index+1,
(p->ids.index() - index-1)*sizeof(size_t));
Expand Down Expand Up @@ -116,7 +117,7 @@ int receive_features(parser* p, void* ex)
bufread_simple_label(&(p->pes[ring_index].ld), c);
}

if( p->pes[ring_index].features.index() == p->input.count )
if( p->pes[ring_index].features.index() == input->count )
{
push( ae->indices, multindex );
push_many( ae->atomics[multindex], p->pes[ring_index].features.begin, p->pes[ring_index].features.index() );
Expand Down
7 changes: 6 additions & 1 deletion parse_args.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
("thread_bits", po::value<size_t>(&global.thread_bits)->default_value(0), "log_2 threads")
("loss_function", po::value<string>()->default_value("squaredloss"), "Specify the loss function to be used, uses squaredloss by default. Currently available ones are squaredloss, hingeloss, logloss and quantileloss.")
("quantile_tau", po::value<double>()->default_value(0.5), "Parameter \\tau associated with Quantile loss. Defaults to 0.5")
("unique_id", po::value<size_t>(&global.unique_id)->default_value(0),"unique id used for cluster parallel");
("unique_id", po::value<size_t>(&global.unique_id)->default_value(0),"unique id used for cluster parallel")
("compressed", "use gzip format whenever appropriate. If a cache file is being created, this option creates a compressed cache file. A mixture of raw-text & compressed inputs are supported if this option is on");

global.example_number = 0;
global.weighted_examples = 0.;
Expand Down Expand Up @@ -101,6 +102,10 @@ po::variables_map parse_args(int argc, char *argv[], boost::program_options::opt
global.num_bits = vm["bit_precision"].as< size_t>();
}

if(vm.count("compressed")){
set_compressed(par);
}

if (global.num_bits > 31) {
cerr << "The system limits at 31 bits of precision!\n" << endl;
exit(1);
Expand Down
2 changes: 1 addition & 1 deletion parse_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ int read_features(parser* p, void* ex)
{
example* ae = (example*)ex;
char *line=NULL;
int num_chars = readto(p->input, line, '\n');
int num_chars = readto(*(p->input), line, '\n');
if (num_chars == 0)
return num_chars;

Expand Down
Loading

0 comments on commit 4ab9d6a

Please sign in to comment.