Skip to content
This repository was archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
changed several variable names
Browse files Browse the repository at this point in the history
  • Loading branch information
bow authored and Wibowo Arindrarto committed Jan 16, 2012
1 parent ce752c9 commit 24fab7f
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 102 deletions.
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ seq
qual
String of phred quality characters of the base-called sequence.

qualVal
qual_val
List of phred quality values of the base-called sequence.

id
Expand All @@ -36,12 +36,12 @@ name
trim(sequence[, cutoff=0.05])
Returns a trimmed sequence using Richard Mott's algorithm (used in phred)
with the probability cutoff of 0.05. Can be used on ``seq``, ``qual``, and
``qualVal``.
``qual_val``.

get_data(key)
Returns metadata stored in the file, accepts keys from ``tags`` (see below).

export([outFile="", fmt='fasta'])
export([out_file="", fmt='fasta'])
Writes a fasta (``fmt='fasta'``), qual (``fmt='qual'``), or
fastq (``fmt='fastq'``) file from the trace file. Default format is ``fasta``.

Expand Down Expand Up @@ -76,14 +76,14 @@ Or if you want to perform base trimming directly::
>>> yummy = abifpy.Trace('tracefile.ab1', trimming=True)

Sequence can be accessed with the ``seq`` attribute. Other attributes of note
are ``qual`` for phred quality characters, ``qualVal`` for phred quality values,
are ``qual`` for phred quality characters, ``qual_val`` for phred quality values,
``id`` for sequencing trace file name, and ``name`` for the sample name::

>>> yummy.seq
'CCAAGGTGCAGACTTCCATCT'
>>> yummy.qual
'3824DESHSSSSS:DSSSSSS'
>>> yummy.qualVal
>>> yummy.qual_val
[18, 23, 17, 19, 35, 36, 50, 39, 50, 50, 50, 50, 50, 25, 35, 50, 50, 50, 50, 50, 50]
>>> yummy.id
'tracefile'
Expand Down
167 changes: 84 additions & 83 deletions abifpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from sys import version_info

__all__ = ['Trace']

# dictionary for deciding which values to extract and contain in self.data
EXTRACT = {
'TUBE1': 'well',
Expand Down Expand Up @@ -80,8 +82,8 @@ def py3_get_byte(string):

class Trace(object):
"""Class representing trace file."""
def __init__(self, inFile, trimming=False):
self._handle = open(inFile, 'rb')
def __init__(self, in_file, trimming=False):
self._handle = open(in_file, 'rb')
try:
self._handle.seek(0)
if not self._handle.read(4) == py3_get_byte('ABIF'):
Expand All @@ -108,23 +110,23 @@ def __init__(self, inFile, trimming=False):

# build dictionary of data tags and metadata
for entry in self._parse_header(header):
key = entry.tagName + str(entry.tagNum)
key = entry.tag_name + str(entry.tag_num)
self.tags[key] = entry
# only extract data from tags we care about
if key in EXTRACT:
# e.g. self.data['well'] = 'B6'
self.data[EXTRACT[key]] = self.get_data(key)

self.id = inFile.replace('.ab1', '')
self.id = in_file.replace('.ab1', '')
self.name = self.get_data('SMPL1')
self.seq = self.get_data('PBAS2')
self.qual = ''.join([chr(ord(value) + 33) for value in self.get_data('PCON2')])
self.qualVal = [ord(value) for value in self.get_data('PCON2')]
self.qual_val = [ord(value) for value in self.get_data('PCON2')]

if trimming:
self.seq, self.qual, self.qualVal = map(self.trim,
self.seq, self.qual, self.qual_val = map(self.trim,
[self.seq, self.qual,
self.qualVal])
self.qual_val])

def close(sel):
"""Closes the Trace file object."""
Expand All @@ -134,15 +136,15 @@ def __repr__(self):
"""Represents data associated with the file."""
if len(self.seq) > 10:
seq = "{0}...{1}".format(self.seq[:5], self.seq[-5:])
qualVal = "[{0}, ..., {1}]".format(
repr(self.qualVal[:5])[1:-1],
repr(self.qualVal[-5:])[1:-1])
qual_val = "[{0}, ..., {1}]".format(
repr(self.qual_val[:5])[1:-1],
repr(self.qual_val[-5:])[1:-1])
else:
seq = self.seq
qualVal = self.qualVal
qual_val = self.qual_val

return "{0}({1}, qualVal:{2}, id:{3}, name:{4})".format(
self.__class__.__name__, repr(seq), qualVal,
return "{0}({1}, qual_val:{2}, id:{3}, name:{4})".format(
self.__class__.__name__, repr(seq), qual_val,
repr(self.id), repr(self.name))

def _parse_header(self, header):
Expand All @@ -151,52 +153,51 @@ def _parse_header(self, header):
# file signature, file version, tag name, tag number,
# element type code, element size, number of elements
# data size, data offset, handle
headElemSize = header[5]
headElemNum = header[6]
headOffset = header[8]
head_elem_size = header[5]
head_elem_num = header[6]
head_offset = header[8]
index = 0

while index < headElemNum:
start = headOffset + index * headElemSize
while index < head_elem_num:
start = head_offset + index * head_elem_size
# added directory offset to tuple
# to handle directories with data size <= 4 bytes
self._handle.seek(start)
dirEntry = struct.unpack(_DIRFMT,
dir_entry = struct.unpack(_DIRFMT,
self._handle.read(struct.calcsize(_DIRFMT))) + (start,)
index += 1
yield _TraceDir(dirEntry, self._handle)
yield _TraceDir(dir_entry, self._handle)

def get_data(self, key):
"""Returns data stored in a tag."""
return self.tags[key].tagData
return self.tags[key].tag_data

def seq_remove_ambig(self, seq):
"""Replaces extra ambiguous bases with 'N'."""
seq = self.seq

import re
seq = self.seq
return re.sub("K|Y|W|M|R|S", 'N', seq)

def export(self, outFile="", fmt='fasta'):
def export(self, out_file="", fmt='fasta'):
"""Writes the trace file sequence to a fasta file.
Keyword argument:
outFile -- output file name (detault 'tracefile'.fa)
out_file -- output file name (detault 'tracefile'.fa)
fmt -- 'fasta': write fasta file, 'qual': write qual file, 'fastq': write fastq file
"""
if outFile == "":
fileName = self.id
if out_file == "":
file_name = self.id
if fmt == 'fasta':
fileName += '.fa'
file_name += '.fa'
elif fmt == 'qual':
fileName += '.qual'
file_name += '.qual'
elif fmt == 'fastq':
fileName += '.fq'
file_name += '.fq'
else:
raise ValueError('Invalid file format: {0}.'.format(fmt))
else:
fileName = outFile
file_name = out_file

if fmt == 'fasta':
contents = '>{0} {1}\n{2}\n'.format(
Expand All @@ -207,15 +208,15 @@ def export(self, outFile="", fmt='fasta'):
contents = '>{0} {1}\n{2}\n'.format(
self.id,
self.name,
' '.join(map(str, self.qualVal)))
' '.join(map(str, self.qual_val)))
elif fmt == 'fastq':
contents = '@{0} {1}\n{2}\n+{0} {1}\n{3}\n'.format(
self.id,
self.name,
self.seq, ''.join(self.qual))

with open(fileName, 'w') as outFile:
outFile.writelines(contents)
with open(file_name, 'w') as out_file:
out_file.writelines(contents)

def trim(self, seq, cutoff=0.05):
"""Trims the sequence using Richard Mott's modified trimming algorithm.
Expand All @@ -235,101 +236,101 @@ def trim(self, seq, cutoff=0.05):
start = False
# set minimum segment size
segment = 20
trimStart = 0
trim_start = 0

if len(seq) <= segment:
raise ValueError('Sequence can not be trimmed because \
it is shorter than the trim segment size')
else:
# calculate probability back from formula used
# to calculate phred qual values
scoreList = [cutoff - (10 ** (qual/-10.0)) for
qual in self.qualVal]
score_list = [cutoff - (10 ** (qual/-10.0)) for
qual in self.qual_val]

# calculate cummulative scoreList
# calculate cummulative score_list
# if cummulative value < 0, set to 0
# first value is set to 0 (assumption: trimStart is always > 0)
runningSum = [0]
for i in range(1, len(scoreList)):
num = runningSum[-1] + scoreList[i]
# first value is set to 0 (assumption: trim_start is always > 0)
running_sum = [0]
for i in range(1, len(score_list)):
num = running_sum[-1] + score_list[i]
if num < 0:
runningSum.append(0)
running_sum.append(0)
else:
runningSum.append(num)
running_sum.append(num)
if not start:
# trimStart = value when cummulative starts to be > 0
trimStart = i
# trim_start = value when cummulative starts to be > 0
trim_start = i
start = True

# trimFinish = index of the highest cummulative value,
# trim_finish = index of the highest cummulative value,
# marking the segment with the highest cummulative score
trimFinish = runningSum.index(max(runningSum))
trim_finish = running_sum.index(max(running_sum))

return seq[trimStart:trimFinish]
return seq[trim_start:trim_finish]

class _TraceDir(object):
"""Class representing directory content."""
def __init__(self, tagEntry, handle):
self.tagName = py3_get_string(tagEntry[0])
self.tagNum = tagEntry[1]
self.elemCode = tagEntry[2]
self.elemSize = tagEntry[3]
self.elemNum = tagEntry[4]
self.dataSize = tagEntry[5]
self.dataOffset = tagEntry[6]
self.dataHandle = tagEntry[7]
self.tagOffset = tagEntry[8]
def __init__(self, tag_entry, handle):
self.tag_name = py3_get_string(tag_entry[0])
self.tag_num = tag_entry[1]
self.elem_code = tag_entry[2]
self.elem_size = tag_entry[3]
self.elem_num = tag_entry[4]
self.data_size = tag_entry[5]
self.data_offset = tag_entry[6]
self.data_handle = tag_entry[7]
self.tag_offset = tag_entry[8]

# if data size is <= 4 bytes, data is stored inside the directory
# so offset needs to be changed
if self.dataSize <= 4:
self.dataOffset = self.tagOffset + 20
if self.data_size <= 4:
self.data_offset = self.tag_offset + 20

self.tagData = self._unpack(handle)
self.tag_data = self._unpack(handle)

def __repr__(self):
"""Represents data associated with a tag."""
summary = ['tagName: {0}'.format(repr(self.tagName))]
summary.append('tagNumber: {0}'.format(repr(self.tagNum)))
summary.append('elemCode: {0}'.format(repr(self.elemCode)))
summary.append('elemSize: {0}'.format(repr(self.elemSize)))
summary.append('elemNum: {0}'.format(repr(self.elemNum)))
summary.append('dataSize: {0}'.format(repr(self.dataSize)))
summary.append('dataOffset: {0}'.format(repr(self.dataOffset)))
summary.append('dataHandle: {0}'.format(repr(self.dataHandle)))
summary.append('tagOffset: {0}'.format(repr(self.tagOffset)))
summary.append('tagData: {0}'.format(repr(self.tagData)))
summary = ['tag_name: {0}'.format(repr(self.tag_name))]
summary.append('tag_number: {0}'.format(repr(self.tag_num)))
summary.append('elem_code: {0}'.format(repr(self.elem_code)))
summary.append('elem_size: {0}'.format(repr(self.elem_size)))
summary.append('elem_num: {0}'.format(repr(self.elem_num)))
summary.append('data_size: {0}'.format(repr(self.data_size)))
summary.append('data_offset: {0}'.format(repr(self.data_offset)))
summary.append('data_handle: {0}'.format(repr(self.data_handle)))
summary.append('tag_offset: {0}'.format(repr(self.tag_offset)))
summary.append('tag_data: {0}'.format(repr(self.tag_data)))

return '\n'.join(summary)

def _unpack(self, handle):
"""Returns tag data"""
if self.elemCode in _BYTEFMT:
if self.elem_code in _BYTEFMT:

# because ">1s" unpacks differently from ">s"
num = '' if self.elemNum == 1 else str(self.elemNum)
fmt = "{0}{1}{2}".format('>', num, _BYTEFMT[self.elemCode])
start = self.dataOffset
num = '' if self.elem_num == 1 else str(self.elem_num)
fmt = "{0}{1}{2}".format('>', num, _BYTEFMT[self.elem_code])
start = self.data_offset

handle.seek(start)
data = struct.unpack(fmt, handle.read(struct.calcsize(fmt)))

# no need to use tuple if len(data) == 1
if self.elemCode not in [10, 11] and len(data) == 1:
if self.elem_code not in [10, 11] and len(data) == 1:
data = data[0]

# account for different data types
if self.elemCode == 2:
if self.elem_code == 2:
return py3_get_string(data)
elif self.elemCode == 10:
elif self.elem_code == 10:
return datetime.date(*data)
elif self.elemCode == 11:
elif self.elem_code == 11:
return datetime.time(*data)
elif self.elemCode == 13:
elif self.elem_code == 13:
return bool(data)
elif self.elemCode == 18:
elif self.elem_code == 18:
return py3_get_string(data[1:])
elif self.elemCode == 19:
elif self.elem_code == 19:
return py3_get_string(data[:-1])
else:
return data
Expand Down
Loading

0 comments on commit 24fab7f

Please sign in to comment.