Astra SDK  v2.1.3
StreamReader.hpp
1 // This file is part of the Orbbec Astra SDK [https://orbbec3d.com]
2 // Copyright (c) 2015-2017 Orbbec 3D
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 // Be excellent to each other.
17 #ifndef ASTRA_STREAMREADER_HPP
18 #define ASTRA_STREAMREADER_HPP
19 
20 #include "capi/astra_core.h"
21 #include <astra_core/FrameListener.hpp>
22 #include <astra_core/Frame.hpp>
23 #include <memory>
24 #include <vector>
25 #include <algorithm>
26 #include <functional>
27 #include <iterator>
28 #include <iostream>
29 
30 namespace astra {
31 
39  {
40  public:
41  StreamReader()
42  : readerRef_(nullptr)
43  {}
44 
45  StreamReader(astra_reader_t reader)
46  : readerRef_(std::make_shared<ReaderRef>(reader))
47  {}
48 
49  StreamReader(const StreamReader& reader)
50  : readerRef_(reader.readerRef_)
51  {}
52 
53  StreamReader& operator=(const StreamReader& reader)
54  {
55  this->readerRef_ = reader.readerRef_;
56  return *this;
57  }
58 
59  template<typename T>
60  T stream()
61  {
62  return stream<T>(DEFAULT_SUBTYPE);
63  }
64 
65  template<typename T>
66  T stream(astra_stream_subtype_t subtype)
67  {
68  if (!is_valid())
69  throw std::logic_error("StreamReader is not associated with a streamset.");
70 
71  astra_streamconnection_t connection;
72  astra_reader_get_stream(readerRef_->get_reader(),
73  T::id,
74  subtype,
75  &connection);
76 
77  return T(connection);
78  }
79 
80 
87  void add_listener(FrameListener& listener)
88  {
89  if (!is_valid())
90  throw std::logic_error("StreamReader is not associated with a streamset.");
91 
92  readerRef_.get()->add_listener(listener);
93  }
94 
102  {
103  if (!is_valid())
104  throw std::logic_error("StreamReader is not associated with a streamset.");
105 
106  readerRef_.get()->remove_listener(listener);
107  }
108 
109  bool is_valid() { return readerRef_ != nullptr; }
110 
111  bool has_new_frame()
112  {
113  if (!is_valid())
114  throw std::logic_error("StreamReader is not associated with a streamset.");
115 
116  bool hasNewFrame = false;
117  astra_reader_has_new_frame(readerRef_->get_reader(), &hasNewFrame);
118  return hasNewFrame;
119  }
120 
121  Frame get_latest_frame(int timeoutMillis = ASTRA_TIMEOUT_FOREVER)
122  {
123  if (!is_valid())
124  throw std::logic_error("StreamReader is not associated with a streamset.");
125 
126  astra_reader_frame_t frame;
127  astra_reader_open_frame(readerRef_->get_reader(), timeoutMillis, &frame);
128 
129  return astra::Frame(frame);
130  }
131 
132  private:
133  class ReaderRef;
134  using ReaderRefPtr = std::shared_ptr<ReaderRef>;
135 
136  StreamReader(ReaderRefPtr readerRef)
137  : readerRef_(readerRef)
138  { }
139 
140  class ReaderRef :
141  public std::enable_shared_from_this<ReaderRef>
142  {
143  public:
144  ReaderRef(astra_reader_t reader)
145  : reader_(reader)
146  { }
147 
148  ~ReaderRef()
149  {
150  listeners_.clear();
151  ensure_callback_removed();
152  astra_reader_destroy(&reader_);
153  }
154 
155  static void ASTRA_CALLBACK frame_ready_thunk(void* clientTag,
156  astra_reader_t reader,
157  astra_reader_frame_t frame)
158  {
159  ReaderRef* self = static_cast<ReaderRef*>(clientTag);
160  self->notify_listeners(frame);
161  }
162 
163  void add_listener(FrameListener& listener)
164  {
165  ensure_callback_added();
166 
167  auto it = std::find(listeners_.begin(),
168  listeners_.end(),
169  listener);
170 
171  if (it != listeners_.end())
172  return;
173 
174  if (isNotifying_)
175  {
176  addedListeners_.push_back(listener);
177  }
178  else
179  {
180  listeners_.push_back(listener);
181  }
182  }
183 
184  void remove_listener(FrameListener& listener)
185  {
186  auto it = std::find(listeners_.begin(),
187  listeners_.end(),
188  listener);
189 
190  if (it == listeners_.end())
191  return;
192 
193  if (isNotifying_)
194  {
195  removedListeners_.push_back(listener);
196  }
197  else
198  {
199  listeners_.erase(it);
200  }
201 
202  if (listeners_.size() == 0)
203  {
204  ensure_callback_removed();
205  }
206  }
207 
208  void notify_listeners(astra_reader_frame_t readerFrame)
209  {
210  if (removedListeners_.size() > 0)
211  {
212  for(FrameListener& listener : removedListeners_)
213  {
214  auto it = std::find(listeners_.begin(),
215  listeners_.end(),
216  listener);
217 
218  listeners_.erase(it);
219  }
220  removedListeners_.clear();
221  }
222 
223  std::move(addedListeners_.begin(),
224  addedListeners_.end(),
225  std::back_inserter(listeners_));
226 
227  if (listeners_.size() == 0)
228  {
229  ensure_callback_removed();
230  return;
231  }
232 
233  //we didn't open the frame, so don't auto close it.
234  //the StreamReader internals will close it automatically
235  const bool autoCloseFrame = false;
236  astra::Frame frameWrapper(readerFrame, autoCloseFrame);
237 
238  isNotifying_ = true;
239  StreamReader reader(shared_from_this());
240  for(FrameListener& listener : listeners_)
241  {
242  listener.on_frame_ready(reader, frameWrapper);
243  }
244 
245  isNotifying_ = false;
246  }
247 
248  astra_reader_t get_reader() { return reader_; }
249 
250  private:
251  void ensure_callback_added()
252  {
253  if (!callbackRegistered_)
254  {
255  astra_reader_register_frame_ready_callback(reader_,
256  &ReaderRef::frame_ready_thunk,
257  this,
258  &callbackId_);
259 
260  callbackRegistered_ = true;
261  }
262  }
263 
264  void ensure_callback_removed()
265  {
266  if (callbackRegistered_)
267  {
268  astra_reader_unregister_frame_ready_callback(&callbackId_);
269  callbackRegistered_ = false;
270  }
271  }
272 
273  astra_reader_t reader_;
274 
275  bool isNotifying_{false};
276  bool callbackRegistered_{false};
277 
278  using ListenerList = std::vector<std::reference_wrapper<FrameListener>>;
279 
280  ListenerList listeners_;
281  ListenerList addedListeners_;
282  ListenerList removedListeners_;
283 
284  astra_reader_callback_id_t callbackId_;
285  };
286 
287  ReaderRefPtr readerRef_;
288 
289  friend bool operator==(const StreamReader& lhs, const StreamReader& rhs);
290  };
291 
292  inline bool operator==(const StreamReader& lhs, const StreamReader& rhs)
293  {
294  return lhs.readerRef_ == rhs.readerRef_;
295  }
296 
297  inline bool operator!=(const StreamReader& lhs, const StreamReader& rhs)
298  {
299  return !(lhs == rhs);
300  }
301 }
302 
303 #endif // ASTRA_STREAMREADER_HPP
Frame class
Definition: Frame.hpp:32
Frame Listener class
Definition: FrameListener.hpp:31
Stream Reader class
Definition: StreamReader.hpp:39
void add_listener(FrameListener &listener)
add listener
Definition: StreamReader.hpp:87
void remove_listener(FrameListener &listener)
remove listener
Definition: StreamReader.hpp:101
bool operator!=(const ImageStreamMode &lhs, const ImageStreamMode &rhs)
compare is ImageStreamMode not equal
Definition: Image.hpp:247
bool operator==(const ImageStreamMode &lhs, const ImageStreamMode &rhs)
compare is ImageStreamMode equal
Definition: Image.hpp:230
Definition: astra_plugin.h:29
Definition: astra_plugin.h:24