gtkIOStream  1.7.0
GTK+ << C++ IOStream operators for GTK+. Now with ORBing, numerical computation, audio client and more ...
IIOThreadedQ.H
Go to the documentation of this file.
1 /* Copyright 2000-2018 Matt Flax <flatmax@flatmax.org>
2  This file is part of GTK+ IOStream class set
3 
4  GTK+ IOStream is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or
7  (at your option) any later version.
8 
9  GTK+ IOStream is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You have received a copy of the GNU General Public License
15  along with GTK+ IOStream
16 */
17 
18 #ifndef IIOTHREADEDQ_H_
19 #define IIOTHREADEDQ_H_
20 
21 #include "IIO.H"
22 #include "Thread.H"
23 #include "BlockBuffer.H"
24 
25 class IIOThreadedQ : public IIO, public ThreadedMethod, public Cond, public BlockBuffer {
26 
30  void *threadMain(void) {
31  struct sched_param param;
32  param.sched_priority = 96;
33  if (sched_setscheduler(0, SCHED_FIFO, & param) == -1) {
34  perror("sched_setscheduler");
35  return NULL;
36  }
37 
38  struct timespec lockStart, lockStop;
39 
40  Eigen::Array<unsigned short, Eigen::Dynamic, Eigen::Dynamic> *b; // get an empty buffer for query
41 
42  cout<<"entering the thread while loop"<<endl;
43  while (1) {
44 
45  if( clock_gettime( CLOCK_REALTIME, &lockStart) == -1 )
46  cout<<"clock lockStart get time error"<<endl;
47  int nframes=0;
48  b=getEmptyBuffer(); // get an empty buffer
49  if (!b) {
50  cout<<"threadMain : Error : couldn't get a valid empty buffer - possibly dropping samples.\n";
51  usleep(1000); // sleep for a ms
52  } else {
53  nframes=getReadArraySampleCount(*b);
54 // cout<<"b rows,cols = "<<b->rows()<<","<<b->cols()<<'\n';
55 // cout<<"planning to read nframes="<<nframes<<" per cycle\n";
56  int ret=read(nframes, *b);
57  if (ret!=NO_ERROR)
58  break;
59 
60  putFullBuffer(b); // put the now full buffer onto the full buffer queue
61 
62  lock(); // lock the mutex, indicate the condition and wake the thread.
63  newbufReady=true;
64  signal(); // Wake the WaitingThread
65  unLock(); // Unlock so the WaitingThread can continue.
66 
67  }
68  if( clock_gettime( CLOCK_REALTIME, &lockStop) == -1 )
69  cout<<"clock lockStop get time error"<<endl;
70 
71  double duration = 1.e3*( lockStop.tv_sec - lockStart.tv_sec ) + (double)( lockStop.tv_nsec - lockStart.tv_nsec )/1.e6;
72  cout<<"thread duration = "<<duration<<'\t';
73  if (duration<(.5*(float)nframes/sampleRate*1.e3)){
74  //cout<<"too quick, adding more time"<<endl;
75  usleep((int)floor((float)nframes*.5/sampleRate*1.e6));
76  }
77 
78  if( clock_gettime( CLOCK_REALTIME, &lockStop) == -1 )
79  cout<<"clock lockStop get time error"<<endl;
80 
81  duration = 1.e3*( lockStop.tv_sec - lockStart.tv_sec ) + (double)( lockStop.tv_nsec - lockStart.tv_nsec )/1.e6;
82  cout<<"thread duration now = "<<duration<<'\n';
83  }
84 
85  cout<<"IIO read thread stopped due to error"<<endl;
86  return NULL;
87  }
88 
96  int resizeBuffers(int N, int ch) {
97  // if the data matrix is larger in columns then the number of capture channels, then resize it.
98 
99  // find out how big the buffers should be
100  Eigen::Array<unsigned short, Eigen::Dynamic, Eigen::Dynamic> b;
101  int retVal=getReadArray(N, b);
102  if (retVal!=NO_ERROR)
103  return retVal;
104 
105  ch=(int)ceil((float)ch/(float)operator[](0).getChCnt()); // check whether we require less then the available number of channels
106  if (b.cols()<ch)
107  ch=b.cols();
108  cout<<"resizing buffers to "<<b.rows()<<" rows and "<<ch<<" cols"<<endl;
109 
110  // ensure that the buffers exist with the correct sizes
111  BlockBuffer::resizeBuffers(b.rows(), ch);
112  //setChannelBufferCnt(N*2);
113  return NO_ERROR;
114  }
115 
116 public:
117  float sampleRate;
119 
122  newbufReady=false;
123  }
124 
125  virtual ~IIOThreadedQ() {}
126 
128 // int oneDevChCnt=operator[](0).getChCnt();
129 // int cols=(int)ceil((float)ch/(float)oneDevChCnt); // check whether we require less then the available number of channels
130 // return IIOThreadedQ::resizeBuffers(N*oneDevChCnt, ch);
131  return IIOThreadedQ::resizeBuffers(N, ch);
132  }
133 };
134 
135 #endif // IIOTHREADEDQ_H_
void putFullBuffer(Eigen::Array< unsigned short, Eigen::Dynamic, Eigen::Dynamic > *fb)
Definition: BlockBuffer.H:93
int N
void signal()
Definition: Thread.H:369
void * threadMain(void)
Definition: IIOThreadedQ.H:30
int resizeBuffers(int N, int ch)
Definition: IIOThreadedQ.H:96
Definition: Thread.H:340
int getReadArraySampleCount(Eigen::Array< TYPE, Eigen::Dynamic, Eigen::Dynamic > &array)
Definition: IIO.H:241
#define NO_ERROR
There is no error.
Definition: Debug.H:33
int read(uint N, const Eigen::Array< TYPE, Eigen::Dynamic, Eigen::Dynamic > &array)
Definition: IIO.H:254
The iio_channel_info structure is external.
Definition: IIO.H:72
unsigned int uint
Definition: Box.H:28
int getReadArray(uint N, Eigen::Array< TYPE, Eigen::Dynamic, Eigen::Dynamic > &array)
Definition: IIO.H:222
int setSampleCountChannelCount(uint N, uint ch)
Definition: IIOThreadedQ.H:127
float sampleRate
Definition: IIOThreadedQ.H:117
int unLock()
Definition: Thread.H:325
void resizeBuffers(int rows, int cols)
Definition: BlockBuffer.H:119
void resize(int count)
Definition: BlockBuffer.H:133
virtual ~IIOThreadedQ()
Definition: IIOThreadedQ.H:125
int lock()
Definition: Thread.H:295
Eigen::Array< unsigned short, Eigen::Dynamic, Eigen::Dynamic > * getEmptyBuffer(void)
Definition: BlockBuffer.H:64
gtkIOStream: /tmp/gtkiostream/include/IIO/IIOThreadedQ.H Source File
GTK+ IOStream  Beta