001 /**
002 *
003 */
004 package org.wdssii.polarmerger;
005
006 import java.util.ArrayList;
007 import java.util.Date;
008 import java.util.List;
009
010 import org.apache.commons.logging.Log;
011 import org.apache.commons.logging.LogFactory;
012 import org.wdssii.core.Algorithm;
013 import org.wdssii.core.BuilderFactory;
014 import org.wdssii.core.DataEncoder;
015 import org.wdssii.core.DataType;
016 import org.wdssii.core.IndexRecord;
017 import org.wdssii.core.Location;
018 import org.wdssii.core.PolarGrid;
019 import org.wdssii.core.Radial;
020 import org.wdssii.core.RadialSet;
021
022 /**
023 * @author lakshman
024 *
025 */
026 public class PolarMerger extends Algorithm {
027 private static Log log = LogFactory.getLog(PolarMerger.class);
028
029 public static void main(String[] args) {
030 final PolarMerger alg = new PolarMerger();
031 alg.setupAndExecute(args);
032 }
033
034 private String inputType = "Reflectivity";
035
036 private boolean compositeEnabled = true;
037
038 private int writeIntervalSeconds = 60;
039
040 private boolean writeAfterEveryInputReceived = false;
041
042 private float maxElevation = 20.0f;
043
044 private float azimuthalSpacingDegrees = 0.5f;
045
046 private float gateWidthKms = 0.5f;
047
048 //private int numAz = Math.round(360.0f / azimuthalSpacingDegrees);
049
050 private int numGates = 500;
051
052 private float ageOffInMinutes = 10;
053
054 private int numElev = Math.round(maxElevation / azimuthalSpacingDegrees);
055
056 public enum MergerStrategy { DistanceWeighted, NearestNeighbor, Latest };
057
058 private MergerStrategy mergerStrategy = MergerStrategy.DistanceWeighted;
059
060 public float getAzimuthalSpacingDegrees() {
061 return azimuthalSpacingDegrees;
062 }
063
064 public void setAzimuthalSpacingDegrees(float azimuthalSpacingDegrees) {
065 this.azimuthalSpacingDegrees = azimuthalSpacingDegrees;
066 reset();
067 }
068
069 public boolean isCompositeEnabled() {
070 return compositeEnabled;
071 }
072
073 public void setCompositeEnabled(boolean compositeEnabled) {
074 this.compositeEnabled = compositeEnabled;
075 }
076
077 public float getMaxElevation() {
078 return maxElevation;
079 }
080
081 public void setMaxElevation(float elevationDegrees) {
082 this.maxElevation = elevationDegrees;
083 reset();
084 }
085
086 private void reset() {
087 this.observations = null;
088 this.lookup = null;
089 //this.numAz = Math.round(360.0f / azimuthalSpacingDegrees);
090 this.numElev = Math.round(maxElevation / azimuthalSpacingDegrees);
091 }
092
093 public float getGateWidthKms() {
094 return gateWidthKms;
095 }
096
097 public void setGateWidthKms(float gateWidthKms) {
098 this.gateWidthKms = gateWidthKms;
099 }
100
101 public String getInputType() {
102 return inputType;
103 }
104
105 public void setInputType(String inputType) {
106 this.inputType = inputType;
107 }
108
109 public int getNumGates() {
110 return numGates;
111 }
112
113 public void setNumGates(int numGates) {
114 this.numGates = numGates;
115 }
116
117 public int getWriteIntervalSeconds() {
118 return writeIntervalSeconds;
119 }
120
121 public void setWriteIntervalSeconds(int writeIntervalSeconds) {
122 this.writeIntervalSeconds = writeIntervalSeconds;
123 }
124
125 public float getAgeOffInMinutes() {
126 return ageOffInMinutes;
127 }
128
129 public void setAgeOffInMinutes(float ageOffInMinutes) {
130 this.ageOffInMinutes = ageOffInMinutes;
131 }
132
133 public String getOutputType() {
134 return outputType;
135 }
136
137 public void setOutputType(String outputType) {
138 this.outputType = outputType;
139 }
140
141 /**
142 * One of the MergerStrategy enums
143 * @return mergerStrategy
144 */
145 public String getMergerStrategy() {
146 return mergerStrategy.toString();
147 }
148
149 /**
150 * One of the MergerStrategy enums
151 * @param mergerStrategy
152 */
153 public void setMergerStrategy(String mergerStrategy) {
154 this.mergerStrategy = MergerStrategy.valueOf(mergerStrategy);
155 }
156
157 public void handleRecord(IndexRecord rec) {
158 try {
159 if (log.isDebugEnabled()) {
160 log.debug("Received: " + rec);
161 }
162 if (!rec.getDataType().equals(inputType)) {
163 return;
164 }
165 long start = 0;
166 if (log.isInfoEnabled()) {
167 start = System.currentTimeMillis();
168 log.info("Starting to process " + rec);
169 }
170 RadialSet rs = (RadialSet) BuilderFactory.createDataType(rec);
171 update(rs);
172 if (log.isInfoEnabled()) {
173 long howlong = System.currentTimeMillis() - start;
174 log.info("Took " + howlong / 1000.0 + "s to process " + rec);
175 }
176 computeAndWriteIfNeeded();
177 } catch (Exception e) {
178 log.error("Error handling " + rec, e);
179 }
180 }
181
182 private void computeAndWriteIfNeeded() {
183 if (lastWrite == null || lastUpdate == null){
184 return;
185 }
186 long timeSinceLastWrite = lastUpdate.getTime() - lastWrite.getTime();
187 if (writeAfterEveryInputReceived || timeSinceLastWrite > getWriteIntervalSeconds() * 1000) {
188 if (log.isInfoEnabled()) {
189 log.info(timeSinceLastWrite/1000.0
190 + " seconds since lastWrite: computing and writing outputs");
191 }
192 computeAndWriteOutputs();
193 }
194 }
195
196 private int numNewRadials = 0;
197
198 private MergeableObservations observations = null;
199
200 private PowerDensityLookup lookup = null;
201
202 private Date lastUpdate = null;
203
204 private Date lastWrite = null;
205
206 private Location radarLocation = null;
207
208 private float nyquist = 0;
209
210 private String outputType = "";
211
212 private synchronized void update(RadialSet rs) {
213 initIfNeeded(rs);
214 if (lastUpdate == null || rs.getTime().after(lastUpdate)) {
215 lastUpdate = rs.getTime();
216 if (lastWrite == null) {
217 // first time around
218 lastWrite = lastUpdate;
219 }
220 }
221 Radial[] radials = rs.getRadials();
222 for (int i = 0; i < radials.length; ++i) {
223 update(radials[i], rs.getElevation(), rs.getRangeToFirstGateKms(), rs.getTime());
224 }
225 numNewRadials += radials.length;
226 if (log.isDebugEnabled()) {
227 log.debug("Finished updating with " + radials.length);
228 }
229 }
230
231 private synchronized void initIfNeeded(RadialSet rs) {
232 float beamwidth = rs.getBeamWidth();
233 float gatewidth = rs.getGateWidthKms();
234 this.radarLocation = rs.getRadarLocation();
235 this.nyquist = rs.getNyquist();
236
237 // init grid if needed
238 if (observations == null) {
239 lookup = new PowerDensityLookup(beamwidth, azimuthalSpacingDegrees,
240 gatewidth, gateWidthKms);
241 observations = new MergeableObservations(mergerStrategy, lookup);
242 }
243
244 if (outputType == null || outputType.length() == 0) {
245 outputType = rs.getTypeName();
246 }
247 }
248
249 private synchronized void update(Radial radial, float elevation, float dist_to_first_gate, Date date) {
250 int center_az = Math.round(radial.getMidAzimuth()
251 / lookup.getAzimuthalSpacingDegrees());
252 int center_e = Math.round(elevation
253 / lookup.getAzimuthalSpacingDegrees());
254
255 float[] radial_values = radial.getValues();
256
257 for (int r = 0; r < radial_values.length; ++r){
258 float value = radial_values[r];
259 if (value != DataType.MissingData){
260 float range = radial.getGateWidthKms()*r + dist_to_first_gate;
261 int center_rn = Math.round(range/lookup.getGateSpacingKms());
262 Observation obs = new Observation(center_e, center_az, center_rn, value, date);
263 observations.add(obs);
264 }
265 }
266
267 }
268
269 private final float distToFirst = 0;
270 protected synchronized void computeAndWriteOutputs() {
271 if (numNewRadials > 0) {
272 if (log.isDebugEnabled()) {
273 log.debug("Writing new product with " + numNewRadials
274 + " updated/new radials");
275 }
276 // Prune
277 Date pruneTime = new Date(lastUpdate.getTime() - (int)(ageOffInMinutes
278 * 60 * 1000));
279 int numPruned = observations.pruneBefore(pruneTime);
280 if (log.isInfoEnabled()) {
281 log.info("Pruned " + numPruned + " observations before "
282 + pruneTime);
283 }
284
285 // Compute output polar grids
286 PolarGrid[] mergedGrids = new PolarGrid[numElev];
287 for (int e = 0; e < numElev; ++e) {
288 float elevation = e * azimuthalSpacingDegrees;
289 // Create polar grid to hold the merged value. Initialize at zero
290 PolarGrid pg = new PolarGrid(radarLocation, lastUpdate, outputType,
291 azimuthalSpacingDegrees, gateWidthKms, elevation,
292 lookup.getBeamwidth(), nyquist, distToFirst, numGates, 0f);
293 mergedGrids[e] = pg;
294 }
295 observations.fillMergedValues(mergedGrids);
296
297 // Compute algorithms
298 PolarGrid[] algs = performAlgorithms(mergedGrids);
299
300 PolarGrid[] grids = new PolarGrid[mergedGrids.length + algs.length];
301 for (int i=0; i < mergedGrids.length; ++i){
302 grids[i] = mergedGrids[i];
303 }
304 for (int i=0; i < algs.length; ++i){
305 grids[mergedGrids.length+i] = algs[i];
306 }
307
308
309 // Write
310 for (PolarGrid pg : grids) {
311 String subtype = getSubTypeForElevation(pg.getElevation());
312 DataEncoder.writeDataAndNotify(pg, getOutputDir(),new String[] { subtype });
313 }
314
315 numNewRadials = 0;
316 lastWrite = lastUpdate;
317 } else {
318 log.debug("Nothing to write");
319 }
320 }
321
322 private PolarGrid[] performAlgorithms(PolarGrid[] grids) {
323 List<PolarGrid> result = new ArrayList<PolarGrid>();
324 if (isCompositeEnabled()) {
325 PolarGrid example = grids[0];
326 PolarGrid composite = new PolarGrid(example, example.getTypeName() + "Composite");
327 float[][] comp_values = composite.getValues();
328 for (int g = 0; g < grids.length; ++g) {
329 float[][] values = grids[g].getValues();
330 for (int i = 0; i < values.length; ++i) {
331 for (int j = 0; j < values[0].length; ++j) {
332 comp_values[i][j] = Math.max(comp_values[i][j],
333 values[i][j]);
334 }
335 }
336 }
337 result.add(composite);
338 }
339 return result.toArray(new PolarGrid[0]);
340 }
341
342 public boolean isWriteAfterEveryInputReceived() {
343 return writeAfterEveryInputReceived;
344 }
345
346 /** If set to true, then writeIntervalSeconds is ignored. */
347 public void setWriteAfterEveryInputReceived(boolean writeAfterEveryInputReceived) {
348 this.writeAfterEveryInputReceived = writeAfterEveryInputReceived;
349 }
350
351 }