/******************************************************************/ /* Simple File Adapter Stream Reader */ /******************************************************************/ package routingservice.adapter.simplefile; import java.io.File; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.List; import java.util.Properties; import com.rti.dds.dynamicdata.DynamicData; import com.rti.dds.typecode.TypeCode; import com.rti.routingservice.adapter.StreamReader; import com.rti.routingservice.adapter.StreamReaderListener; import com.rti.routingservice.adapter.infrastructure.AdapterException; import com.rti.routingservice.adapter.infrastructure.StreamInfo; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.util.logging.FileHandler; import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.Logger; public class SimpleFileAdapterStreamReader implements StreamReader { private String verbosity; private int readPeriod; private String fileName = null; private BufferedReader fileReader = null; private NotificationThread notificationThread = null; private DynamicData dynamicData = null; private DatagramPacket dp=null; private byte[] receive=new byte[65535]; private DatagramSocket ds=null; /** */ private void parseProperties(Properties properties) { String readPeriodStr; readPeriodStr = properties.getProperty("read_period"); if (readPeriodStr == null) { readPeriod = 1000; } else { readPeriod = new Integer(readPeriodStr).intValue(); } } /** */ SimpleFileAdapterStreamReader( StreamReaderListener listener, StreamInfo streamInfo, Properties properties, DatagramSocket ds, String verbosity) throws AdapterException { this.verbosity = verbosity; this.ds=ds; parseProperties(properties); //fileName = streamInfo.getStreamName() + ".txt"; try { //fileReader = new BufferedReader(new FileReader(new File(directory,fileName))); dp=new DatagramPacket(receive,receive.length); } catch (Exception e) { throw new AdapterException(0, "error opening " + dp); } dynamicData = new DynamicData( (TypeCode)streamInfo.getTypeInfo().getTypeRepresentation(), DynamicData.PROPERTY_DEFAULT); notificationThread = new NotificationThread(this, listener); notificationThread.start(); } /** */ void close() throws AdapterException { try { notificationThread.terminate(); notificationThread.join(); if (dp != null) { this.ds.close(); } } catch (InterruptedException e) { throw new AdapterException(0, "error finishing notification thread"); } catch (Exception e) { throw new AdapterException(0, "error closing " + fileName); } } /** */ public void read(List sampleList, List infoList) throws AdapterException { if (verbosity.equals("debug")) { System.out.println("CALL " + getClass().getName() + ".read"); } try { sampleList.clear(); infoList.clear(); dynamicData.clear_all_members(); //line = fileReader.readLine(); ds.receive(dp); String line=data(receive).toString(); System.out.println(line); dynamicData.set_string("value", DynamicData.MEMBER_ID_UNSPECIFIED, line); sampleList.add(dynamicData); receive=new byte[65535]; } catch (IOException e) { throw new AdapterException(0, "error reading from file " + fileName, e); } catch (Exception e) { throw new AdapterException(0, "error reading", e); } } /** */ public void returnLoan(List sampleList, List infoList) throws AdapterException { if (verbosity.equals("debug")) { System.out.println("CALL " + getClass().getName() + ".returnLoan"); } } /** */ public void update(Properties properties) throws AdapterException { parseProperties(properties); notificationThread.setNotificationPeriod(readPeriod); } /** * Notification thread * * This thread will notify of data availability in the file. */ class NotificationThread extends Thread { private int notificationPeriod; private boolean _terminate; private StreamReaderListener listener = null; private StreamReader streamReader = null; /** */ NotificationThread(StreamReader streamReader, StreamReaderListener listener) { this.listener = listener; this.notificationPeriod = readPeriod; this.streamReader = streamReader; _terminate = false; } /** */ @Override public void run() { while (!_terminate) { try { Thread.sleep(notificationPeriod); listener.onDataAvailable(streamReader); } catch (InterruptedException e) { _terminate = true; } catch (Exception iox) { throw new java.lang.IllegalStateException( "Unable to connect Socket", iox); } } } /** */ public void terminate() { _terminate = true; } /** */ public void setNotificationPeriod(int period) { notificationPeriod = period; } } public StringBuilder data(byte[] a){ if(a==null) return null; StringBuilder ret=new StringBuilder(); int i=0; while(a[i]!=0) { ret.append((char)a[i]); i++; } return ret; } }