Skip to content

Commit

Permalink
Add merge command
Browse files Browse the repository at this point in the history
  • Loading branch information
jdesboeufs committed Aug 29, 2017
1 parent ca82b2a commit 11f7d74
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 16 deletions.
7 changes: 7 additions & 0 deletions bin/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ program
require('../lib/commands/extract')(workdir, options).catch(boom)
})

program
.command('merge [workDir]')
.description('merge communes into departements')
.action(workdir => {
require('../lib/commands/merge')(workdir).catch(boom)
})

program
.parse(process.argv)

Expand Down
3 changes: 2 additions & 1 deletion lib/commands/extract.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Promise = require('bluebird')
const ProgressBar = require('ascii-progress')
const rimraf = require('rimraf')

const { isCodeDepartement } = require('../util/codes')
const extractDepartement = require('../extract/departement')

const rimrafAsync = promisify(rimraf)
Expand All @@ -22,7 +23,7 @@ async function handler(workDir, { writeRaw, numWorkers }) {
await rimrafAsync(destDir)

const files = await readdirAsync(join(srcDir, 'departements'))
const departementsFound = files.filter(p => p.match(/^([A-Z0-9]{2,3})$/i))
const departementsFound = files.filter(isCodeDepartement)

const overallBar = new ProgressBar({
schema: ` overall conversion [:bar] :percent (:current/:total) :elapseds/:etas`,
Expand Down
77 changes: 77 additions & 0 deletions lib/commands/merge.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
'use strict'

const { resolve, join } = require('path')
const { readdir } = require('fs')
const { promisify } = require('util')

const Promise = require('bluebird')

const { createGeoJSONReadStream, createGeoJSONWriteStream } = require('../util/geo')
const { isCodeDepartement, isCodeCommune } = require('../util/codes')

const readdirAsync = promisify(readdir)


async function handler(workDir) {
workDir = workDir ? resolve(workDir) : join(__dirname, '..', '..', '.tmp')
const departementsPath = join(workDir, 'geojson', 'departements')

const children = await readdirAsync(departementsPath)
const departements = children.filter(isCodeDepartement)

await Promise.each(departements, async departement => {
await mergeDepartementCommunes(join(departementsPath, departement))
console.log(' merged departement %d', departement)
})
}

async function mergeDepartementCommunes(basePath) {
const communesPath = join(basePath, 'communes')

const children = await readdirAsync(communesPath)
const communes = children.filter(isCodeCommune)

const layers = {}

await Promise.each(communes, async commune => {
const communePath = join(communesPath, commune)
const communeLayers = await readdirAsync(communePath)

communeLayers
.filter(p => p.endsWith('.json.gz'))
.forEach(path => {
const layerName = path.substring(0, path.length - 9)
if (!(layerName in layers)) {
layers[layerName] = []
}
layers[layerName].push(join(communePath, path))
})
})

await Promise.all(Object.keys(layers).map(layerName => {
return new Promise((resolve, reject) => {
const layerPath = join(basePath, layerName + '.json.gz')
const mergedStream = createGeoJSONWriteStream(layerPath)
mergedStream.setMaxListeners(Infinity)

let count = layers[layerName].length

layers[layerName].forEach(srcFile => {
const srcStream = createGeoJSONReadStream(srcFile)
srcStream.pipe(mergedStream, { end: false })
srcStream
.on('error', reject)
.on('end', () => {
count--
if (count === 0) mergedStream.end()
})
})

mergedStream
.on('error', reject)
.on('finish', resolve)
})
}))
}

module.exports = handler
4 changes: 3 additions & 1 deletion lib/extract/departement.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const { promisify } = require('util')
const workerFarm = require('worker-farm')
const Promise = require('bluebird')

const { isCodeCommune } = require('../util/codes')

const readdirAsync = promisify(readdir)

const extractCommuneWorkers = workerFarm(
Expand All @@ -30,7 +32,7 @@ function extractDepartement(baseSrcPath, baseDestPath, codeDep, writeRaw = false
.then(files => {

const communesFound = files
.filter(p => p.match(/^([A-Z0-9]{2,3})([0-9]{2})$/i))
.filter(isCodeCommune)

/* Progression */
extractor.total = communesFound.length
Expand Down
10 changes: 9 additions & 1 deletion lib/util/codes.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,12 @@ function getCodeDep(codeCommune) {
codeCommune.substr(0, 2)
}

module.exports = { getCodeDep }
function isCodeDepartement(string) {
return string.match(/^([0-9A-Z]{2,3})$/)
}

function isCodeCommune(string) {
return string.match(/^([0-9A-Z]{2}[0-9]{3})$/)
}

module.exports = { getCodeDep, isCodeDepartement, isCodeCommune }
52 changes: 39 additions & 13 deletions lib/util/geo.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
'use strict'

const { createWriteStream } = require('fs')
const { createGzip } = require('zlib')
const { createWriteStream, createReadStream } = require('fs')
const { createGzip, createGunzip } = require('zlib')

const turf = require('@turf/turf')
const { stringify } = require('JSONStream')
const { stringify, parse } = require('JSONStream')
const { pipeline, through } = require('mississippi')

const GEOJSON = {
open: '{"type":"FeatureCollection","features":[\n',
separator: ',\n',
close: ']}',
}

function filterArea(minArea) {
return through.obj((feature, enc, cb) => {
if (!feature.geometry || !['Polygon', 'MultiPolygon'].includes(feature.geometry.type)) {
Expand Down Expand Up @@ -51,16 +45,48 @@ function cleanCoords(options = {}) {
})
}

const GEOJSON = {
open: '{"type":"FeatureCollection","features":[\n',
separator: ',\n',
close: ']}',
}

function createGeoJSONWriteStream(path) {
return pipeline.obj(
stringify(GEOJSON.open, GEOJSON.separator, GEOJSON.close),
createGzip(),
createWriteStream(path)
)
}

function createGeoJSONReadStream(path) {
const file = createReadStream(path)
const gunzip = createGunzip()
const parser = parse('features.*')

function onError(err) {
file.destroy()
gunzip.destroy()
parser.emit('error', err)
parser.destroy()
}

file.on('error', onError)
gunzip.on('error', onError)

file.pipe(gunzip).pipe(parser)

return parser
}

function createGeoJSONWritableStream(path) {
return pipeline.obj(
filterArea(1),
truncate({ precision: 7, mutate: true }),
cleanCoords({ mutate: true }),
rewind({ mutate: true }),
stringify(GEOJSON.open, GEOJSON.separator, GEOJSON.close),
createGzip(),
createWriteStream(path)
createGeoJSONWriteStream(path)
)
}

module.exports = { createGeoJSONWritableStream }
module.exports = { createGeoJSONReadStream, createGeoJSONWriteStream, createGeoJSONWritableStream }

0 comments on commit 11f7d74

Please sign in to comment.