The problem can be described by means of this picture:
The video frame can be represented by:
class Frame { private int position; private byte[] data; public Frame(int position, byte[] data) { this.position = position; this.data = data; } public int getPosition() { return position; } public byte[] getData() { return data; } }The actual processor implementation:
public class FrameProcessor { private int threadsNumber; public FrameProcessor(int threadsNumber) { this.threadsNumber = threadsNumber; } public List<Frame> getProcessedFrames(List<Frame> framesToProcess) throws Exception { Function<Frame, Callable<Frame>> transformationFunction = frame -> { return new Callable<Frame>() { @Override public Frame call() throws Exception { System.out.println("Processing frame " + frame.getPosition() + " from Thread " + Thread.currentThread().getName()); Thread.sleep(5); // to simulate CPU intensive computations return frame; } }; }; ExecutorService exec = Executors.newFixedThreadPool(threadsNumber); List<Future<Frame>> processing = exec.invokeAll(framesToProcess .stream().map(transformationFunction).collect(Collectors.toList())); exec.shutdown(); return processing.stream().map(future -> { try { return future.get(); } catch (Exception e) { throw new RuntimeException(e); } }).sorted((frame1, frame2) -> {return frame1.getPosition() - frame2.getPosition()}) .collect(Collectors.toList()); } }and usage example:
public static void main(String[] args) throws Exception { FrameProcessor processor = new FrameProcessor(4); List<Frame> framesToProcess = new ArrayList<>(); for (int i = 0; i < 100; i++) { framesToProcess.add(new Frame(i, new byte[1024])); } long start = System.currentTimeMillis(); for (Frame processedFrame : processor.getProcessedFrames(framesToProcess)) { System.out.print("Frame number " + processedFrame.getPosition() + " processed.\n"); } System.out.println(System.currentTimeMillis() - start);
}This is just an initial concept of the engine. On the higher level some buffering and flushing can be added in order to avoid infamous OutOfMemoryError. The problem can be generalized as well:
1. Input -> sequential data which is ordered,
2. Concurrent processing of data chunks,
3. Output -> processed sequential data with correct order.
No comments :
Post a Comment