Do you need help on a specific subject? Use the contact form (Request a blog entry) on the right hand side.

2015-06-30

Code sample: Multi staged file reading with NSStream from Swift

When reading from file it is not always necessary to read the whole file in one go. Sometimes it is more advantageous to read only as much data as necessary. The exact amount however is often not known in advance, but must be read from the file itself. Usually in the form of a file header. Since reading data from file is a time consuming activity, it is also advantageous if this reading can be done in the background -as part of the runloop- without blocking other processes.

The following solution implements the above: it provides a class that will read data from a file in a multi-staged operation: it first read a file header, and then reads as much data from the file as needed. It also has a hook to process the data that was read (though that might not always be possible). Further, it provides a callback to an object that is called when the necessary data has been read (and possibly the data processing has completed).

The class is used as follows:

class MyDataModel: ReadyListener {
    
    func setupFromFile(file: NSURL) {
        
        let fr = MyFileReader(listener: self)
        
        fr.startReadingFromFile(file.path!) // Callback can happen from now on, probably should'nt be called from within an "init"
    }
    
    
    // Callback from ReadyListener
    
    func fileIsRead(aFile: MyFileReader) {
        if aFile.isValid {
            // .. update data model
        }
    }

}

The instantiation and start of the MyFileReader are separated because after the startReadingFromFile is called callbacks to fileIsRead can happen. If startReadingFromFile is called from within an init, it could theoretically happen that the callback comes before the init is complete. By separating the init and start of reading the object hierarchy can be build before any callbacks occur. (Note that since the callback in this implementation only happens from within a streaming delegate call, the callback can only happen at the end of a runloop cycle. Thus calling from within an init should be ok, its just not good practise to open any code up to such potential problems.)

The implementation of MyFileReader is as follows:

protocol ReadyListener {
    func fileIsRead(aFile: MyFileReader)
}

class MyFileReader: NSObject, NSStreamDelegate {
    
    var isValid: Bool = false
    var error: String = ""
    private let _listener: ReadyListener
    private var _filepath: String = ""
    
    
    /// The listener will not receive any callbacks until after the 'startReadingFromFile' methods is called.
    
    init(listener: ReadyListener) {
        _listener = listener
        super.init()
    }
    
    
    /// Starts reading from the given filepath.
    /// If false is returned, the error member will contain information about the error.
    /// If true is returned the listener will receive a callback when file operations are completed.
    
    func startReadingFromFile(filepath: String) -> Bool {
        
        _filepath = filepath
        
        let stream: NSInputStream! = NSInputStream(fileAtPath: filepath)
        
        if stream == nil {
            error = "Could not open stream for \(filepath)"
            return false
        }
        
        stream.delegate = self
        
        stream.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
        
        stream.open()
        
        return true
    }
    
    
    // MARK: - Stream delegate protocol with associated local variables
    
    private var _streamBuffer: Array<UInt8> = Array(count: 1024, repeatedValue: 0)
    private var _nofUnusedStreamBufferBytes: Int = 0
    private var _waitingForStreamData: Bool = true
    
    func stream(aStream: NSStream, handleEvent eventCode: NSStreamEvent) {
        
        switch eventCode {
            
        case NSStreamEvent.ErrorOccurred:
            
            var message = "An error occured while reading \(_filepath)"
            if let theError = aStream.streamError {
                message = message + " with code = \(theError.code)"
            }
            error = message
            
            closeStream(aStream)
            
            return
            
            
        case NSStreamEvent.EndEncountered:
            
            // Check if all data was read
            
            if _waitingForStreamData  {
                error = "Premature end of file reached for \(_filepath)"
                isValid = false
            }
            
            closeStream(aStream)
            
            return
            
            
        case NSStreamEvent.HasBytesAvailable:
            
            // First initially, then guaranteed by processStreamBuffer, the streamBuffer is empty. Fill the streamBuffer and then call processStreamBuffer (again). When processStreamBuffer returns either all data is read and the stream is terminated, or the streamBuffer is again empty and can be filled by the next HasBytesAvailable event.
            
            var nofBytesRead = (aStream as! NSInputStream).read(&_streamBuffer, maxLength: _streamBuffer.count)
            
            
            // Check for errors during the read
            
            if nofBytesRead < 0 {
                error = "Streaming read failed for \(_filepath)"
                closeStream(aStream)
                return
            }
            
            
            // Remember the number of bytes read
            
            if nofBytesRead == 0 {
                _nofUnusedStreamBufferBytes = _streamBuffer.count
            } else {
                _nofUnusedStreamBufferBytes = nofBytesRead
            }
            
            
            // Process (all of!) the bytes and close the stream if sufficient data has been read.
            
            if processStreamBuffer() { closeStream(aStream) }
            
            
        default: break
        }
    }
    
    private func closeStream(aStream: NSStream) {
        
        aStream.close()
        aStream.removeFromRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
        
        _listener.fileIsRead(self)
        _waitingForStreamData = false
        
        // Null the internal variables that are no longer used to be able to reclaim memory
        
        _streamBuffer = []
        _headerBytes = []
        // _dataBytes = [] do this only when the processing of the data means that this buffer is no longer needed!
    }
    
    // Byte buffers for the header and data
    
    private var _headerBytes: Array<UInt8> = Array(count: 10, repeatedValue: 0)
    private var _headerNofBytes: Int = 0
    private var _headerIsComplete: Bool = false
    private var _size: Int = 0
    private var _dataBytes: Array<UInt8>?
    private var _dataNofBytes: Int = 0
    
    
    // Process the data from the streambuffer, returns true when stream reading can stop, false when not.
    
    private func processStreamBuffer() -> Bool {
        
        let CONTINUE_STREAM_READING       = false
        let STREAM_READING_COMPLETED      = true
        
        
        // The stream buffer is always freshly filled when this method is called
        
        var firstUnusedStreamBufferByte: Int = 0
        
        
        // If not yet complete, read the header
        
        if !_headerIsComplete {
            
            
            // Copy from the streamBuffer to the headerBytes until all header bytes are received.
            
            while _headerNofBytes < _headerBytes.count {
                
                // Copy 1 byte
                
                _headerBytes[_headerNofBytes++] = _streamBuffer[firstUnusedStreamBufferByte++]
                _nofUnusedStreamBufferBytes-- // Housekeeping
                
                
                // If the header is not complete, loop around to copy the next byte, or if exhausted, wait for the next bytes available event.
                
                if (_nofUnusedStreamBufferBytes == 0) && (_headerNofBytes < _headerBytes.count) {
                    return CONTINUE_STREAM_READING
                }
            }
            
            _headerIsComplete = true
            
            // Process the header, check for validity and find out how many bytes must be read.
            
            let sizeOffset = 0 // start at the first byte, adjust as necessary
            
            _size = Int(_headerBytes[sizeOffset])
            _size = _size << 8
            _size = _size | Int(_headerBytes[sizeOffset + 1])
            _size = _size << 8
            _size = _size | Int(_headerBytes[sizeOffset + 2])
            _size = _size << 8
            _size = _size | Int(_headerBytes[sizeOffset + 3])
            
            // Follow up by size validity check....
            
        }
        
        
        // More data available? (Most likely, but we must be sure because we need at least 1 byte for the next bit)
        
        if (_nofUnusedStreamBufferBytes == 0) { return CONTINUE_STREAM_READING }
        
        
        // Create storage for the data bytes if not yet done
        
        if _dataBytes == nil { _dataBytes = Array(count: _size, repeatedValue: 0) }
        
        
        // Copy as many of the remaining bytes as possible, but no more than needed
        
        let neededBytes = _dataBytes!.count - _dataNofBytes
        
        if neededBytes <= _nofUnusedStreamBufferBytes {
            
            // All bytes are available, copy what is needed to the data buffer
            
            _dataBytes![_dataNofBytes ..< _dataBytes!.count] =
                _streamBuffer[firstUnusedStreamBufferByte ..< firstUnusedStreamBufferByte + neededBytes]
            
            _dataNofBytes = _dataBytes!.count
            
        } else {
            
            // All data is needed, add it to the data buffer
            
            _dataBytes![_dataNofBytes ..< _dataNofBytes + _nofUnusedStreamBufferBytes] =
                _streamBuffer[firstUnusedStreamBufferByte ..< firstUnusedStreamBufferByte + _nofUnusedStreamBufferBytes]
            
            _dataNofBytes += _nofUnusedStreamBufferBytes
        }
        
        
        // Check if all data has been read
        
        if _dataNofBytes != _dataBytes!.count { return CONTINUE_STREAM_READING }
        
        
        // All data is read
        
        processData()
        
        return STREAM_READING_COMPLETED
    }
    
    private func processData() {
        // ....
    }

}

I hope the code is self explanatory, just note that MyFileReader inherits from NSObject and that super.init() *must* be called.

Also note that after the stream is closed, the byte buffers are explicitly freed. Thus allowing the OS to reclaim their memory. This allows continued existence of the MyFileReader object without incurring the penalty of keeping the -now- unused buffers allocated. Especially important when dealing with a lot of files.

Happy coding...

Did this help?, then please help out a small independent.
If you decide that you want to make a small donation, you can do so by clicking this
link: a cup of coffee ($2) or use the popup on the right hand side for different amounts.
Payments will be processed by PayPal, receiver will be sales at balancingrock dot nl
Bitcoins will be gladly accepted at: 1GacSREBxPy1yskLMc9de2nofNv2SNdwqH

We don't get the world we wish for... we get the world we pay for.

No comments:

Post a Comment