Python - parsing binary data files
My main constraint is that I am requiring Python 2.7.x. I should really make the read work with Python 3.x, but I'm not sure how to do that.
But... I am now at a point where I want to create the complete and final version. I can then back fill the second half of the chapter. The question is what do you all think should be in the end code and how does what I have so far stack up? I've just taken a stab at a __geo_interface__ and I've used Shapely. It can now do GeoJSON I want to provide simple SQLite, CSV, and KML writing.
If you are into python and up for giving the chapter and code a look, I could really use some feedback!
Things on the todo list... should do or not do any of these and if I should implement them, are there any strong opinions on how or best examples?
- How to produce a good python package with a nice setup.py using distribute
- documentation - pydoc and/or sphinx (it seems that epydoc is out of favor)
- unit and doc testing
- optparse command line interface
- using sqlite3 to make a simple database
- allowing the iterator to handle every nth sample or have a minimum distance between returned datagrams
- kml output that has some flexibility
- Possibly examples using sqlalchemy and/or sqlobject ORMs to access a database
Should I include a templated ISO XML metadata creation? If yes, which
templating engine would be best? I can use Python 2.7's .format or
one of the many other templating engines, but which one?
The code I have so far:
#!/usr/bin/env python
import math
import struct
import os
import mmap
import shapely.geometry
import geojson
datagram_size = 136
field_names = ('time', 'latitude', 'longitude', 'altitude',
'x_vel', 'y_vel', 'z_vel',
'roll', 'pitch', 'platform_heading', 'wander_angle',
'x_acceleration', 'y_acceleration', 'z_acceleration',
'x_angular_rate', 'y_angular_rate', 'z_angular',
'lon_deg', 'lat_deg')
class SbetDatagram(object):
def __init__(self, data, offset=0):
'Unpack the values from a datagram'
values = struct.unpack('17d', data[ offset : offset+ datagram_size ])
sbet_values = dict(zip (field_names, values))
sbet_values['lat_deg'] = math.degrees(sbet_values['latitude'])
sbet_values['lon_deg'] = math.degrees(sbet_values['longitude'])
self.__dict__.update(sbet_values)
@property
def geom(self):
return shapely.geometry.Point(self.lon_deg, self.lat_deg)
@property
def __geo_interface__(self):
'Provide a Geo Interface for GeoJSON serialization'
#
return {'type': 'Point', 'coordinates': (self.x, self.y)}
class SbetFile(object):
def __init__(self, filename, use_mmap=True):
self.filename = filename
sbet_file = open(filename,'rb')
if use_mmap:
sbet_size = os.path.getsize(filename)
self.data = mmap.mmap(sbet_file.fileno(), sbet_size, access=mmap.ACCESS_READ)
else:
self.data = sbet_file.read()
# Make sure the file is sane
assert(len(self.data)%datagram_size == 0)
self.num_datagrams = len(self.data) / datagram_size
def get_offset(self, datagram_index):
return datagram_index * datagram_size
def get_datagram(self, datagram_index):
offset = self.get_offset(datagram_index)
#values = self.decode(offset)
dg = SbetDatagram(self.data, offset)
return dg
def __repr__(self):
# http://docs.python.org/reference/datamodel.html#object.__repr__
return 'Sbet('+self.filename+')'
def __unicode__(self):
return unicode(self.__str__())
def __str__(self):
# http://docs.python.org/reference/datamodel.html#object.__str__
return 'SBet:' + self.filename
@property
def metadata(self):
'''Summary of an SBet - give min/max for all parameters.
May be slow the first time it is requested.'''
if '_metadata_' not in self.__dict__:
# Compute and cache the metadata
m = {}
for name in field_names:
m[name+'_min'] = None
m[name+'_max'] = None
for dg in self:
for name in field_names: # + ('',''):
if m[name+'_min'] is None or dg.__dict__[name] < m[name+'_max']:
m[name+'_min'] = dg.__dict__[name]
if m[name+'_max'] is None or dg.__dict__[name] > m[name+'_max']:
m[name+'_max'] = dg.__dict__[name]
self._metadata_ = m
return self._metadata_
def geom(self):
m = self.metadata
return shapely.geometry.Polygon( [
(m['lon_deg_min'],m['lat_deg_min']),
(m['lon_deg_min'],m['lat_deg_max']),
(m['lon_deg_max'],m['lat_deg_max']),
(m['lon_deg_max'],m['lat_deg_min']),
] )
@property
def __geo_interface__(self):
# http://geojson.org/geojson-spec.html#bounding-boxes
m = self.metadata
r = { "type": "Feature",
"id": self.filename,
"bbox": [m['lon_deg_min'], m['lat_deg_min'], m['lon_deg_max'], m['lat_deg_max']],
"geometry": {
"type": "Polygon",
"coordinates": list(self.geom().boundary.coords)
}
}
return r
def __iter__(self):
return SbetIterator(self)
class SbetIterator(object):
'Independent iterator class for Sbet files'
def __init__(self,sbet):
self.sbet = sbet
self.iter_position = 0
def __iter__(self):
return self
def next(self):
if self.iter_position >= self.sbet.num_datagrams:
raise StopIteration
values = self.sbet.get_datagram(self.iter_position)
self.iter_position += 1
return values
def main():
sbet = SbetFile('sample.sbet')
geodata = geojson.dumps(sbet)
print 'geodata:',geodata
decoded = geojson.loads(geodata)
print 'decoded:',decoded
if __name__ == '__main__':
main()