The ssh2-sftp-client provides a put method, which can accept a readable stream to continuously write data to a remote file. Recently encountered an issue where if the program terminates when the write stream just started, the file contents are cleared. Here we analyze the cause.
In the put method, localSrc can be a readable stream. remotePath is the file path on the remote server.
async put(localSrc, remotePath, options) {
try {
if (typeof localSrc === 'string') {
const localCheck = haveLocalAccess(localSrc);
if (!localCheck.status) {
throw this.fmtError(
`Bad path: ${localSrc} ${localCheck.details}`,
'put',
localCheck.code,
);
}
}
return await this._put(localSrc, remotePath, options);
} catch (e) {
throw e.custom ? e : this.fmtError(`Re-thrown: ${e.message}`, 'put', e.code);
}
}
Based on the remote file path, sftp creates a writable stream, writeStreamOptions sets some configuration for the writable stream. The readable stream continuously reads and writes into the SFTP writable stream.
_put(lPath, rPath, opts, addListeners = true) {
let listeners, wtr, rdr;
return new Promise((resolve, reject) => {
if (addListeners) {
listeners = addTempListeners(this, '_put', reject);
}
opts = {
readStreamOptions: { ...opts?.readStreamOptions, autoClose: true },
writeStreamOptions: { ...opts?.writeStreamOptions, autoClose: true },
pipeOptions: { ...opts?.pipeOptions, end: true },
};
if (haveConnection(this, '_put', reject)) {
wtr = this.sftp.createWriteStream(rPath, opts.writeStreamOptions);
...
if (lPath instanceof Buffer) {
this.debugMsg('put source is a buffer');
wtr.end(lPath);
} else {
rdr =
typeof lPath === 'string'
? fs.createReadStream(lPath, opts.readStreamOptions)
: lPath;
...
rdr.pipe(wtr, opts.pipeOptions);
}
}
}).finally(() => {
if (addListeners) {
removeTempListeners(this, listeners, '_put');
}
});
}
The SFTP writable stream is based on Node.js writable stream with inheritance modifications. If flag is not set, it defaults to w.
Check NodeJS official documentation: 'w': Open file for writing. The file is created (if it does not exist) or truncated (if it exists).
Now we can know the content is cleared because of this ‘w’. But how does SFTP get affected by this flag? Let’s continue.
function WriteStream(sftp, path, options) {
...
WritableStream.call(this, options);
this.path = path;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
this.start = options.start;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.pos = 0;
this.bytesWritten = 0;
this.isClosed = false;
this.handle = options.handle === undefined ? null : options.handle;
this.sftp = sftp;
this._opening = false;
if (this.start !== undefined) {
checkPosition(this.start, 'start');
this.pos = this.start;
}
if (options.encoding)
this.setDefaultEncoding(options.encoding);
// Node v6.x only
this.on('finish', function() {
if (this._writableState.finalCalled)
return;
if (this.autoClose)
this.destroy();
});
if (!Buffer.isBuffer(this.handle))
this.open();
}
The first step of the write stream is to open the stream, and the flags parameter is passed during opening. In the sftp open method, stringToFlags(flags_) converts the stream’s flag into the SFTP protocol’s file opening flag value.
WriteStream.prototype.open = function() {
if (this._opening)
return;
this._opening = true;
this.sftp.open(this.path, this.flags, this.mode, (er, handle) => {
this._opening = false;
if (er) {
this.emit('error', er);
if (this.autoClose)
this.destroy();
return;
}
this.handle = handle;
const tryAgain = (err) => {
if (err) {
// Try chmod() for sftp servers that may not support fchmod() for
// whatever reason
this.sftp.chmod(this.path, this.mode, (err_) => tryAgain());
return;
}
// SFTPv3 requires absolute offsets, no matter the open flag used
if (this.flags[0] === 'a') {
const tryStat = (err, st) => {
if (err) {
// Try stat() for sftp servers that may not support fstat() for
// whatever reason
this.sftp.stat(this.path, (err_, st_) => {
if (err_) {
this.destroy();
this.emit('error', err);
return;
}
tryStat(null, st_);
});
return;
}
this.pos = st.size;
this.emit('open', handle);
this.emit('ready');
};
this.sftp.fstat(handle, tryStat);
return;
}
this.emit('open', handle);
this.emit('ready');
};
this.sftp.fchmod(handle, this.mode, tryAgain);
});
}
open(path, flags_, attrs, cb) {
if (this.server)
throw new Error('Client-only method called in server mode');
if (typeof attrs === 'function') {
cb = attrs;
attrs = undefined;
}
const flags = (typeof flags_ === 'number' ? flags_ : stringToFlags(flags_));
if (flags === null)
throw new Error(`Unknown flags string: ${flags_}`);
let attrsFlags = 0;
let attrsLen = 0;
if (typeof attrs === 'string' || typeof attrs === 'number')
attrs = { mode: attrs };
if (typeof attrs === 'object' && attrs !== null) {
attrs = attrsToBytes(attrs);
attrsFlags = attrs.flags;
attrsLen = attrs.nb;
}
/*
uint32 id
string filename
uint32 pflags
ATTRS attrs
*/
const pathLen = Buffer.byteLength(path);
let p = 9;
const buf = Buffer.allocUnsafe(4 + 1 + 4 + 4 + pathLen + 4 + 4 + attrsLen);
writeUInt32BE(buf, buf.length - 4, 0);
buf[4] = REQUEST.OPEN;
const reqid = this._writeReqid = (this._writeReqid + 1) & MAX_REQID;
writeUInt32BE(buf, reqid, 5);
writeUInt32BE(buf, pathLen, p);
buf.utf8Write(path, p += 4, pathLen);
writeUInt32BE(buf, flags, p += pathLen);
writeUInt32BE(buf, attrsFlags, p += 4);
if (attrsLen) {
p += 4;
if (attrsLen === ATTRS_BUF.length)
buf.set(ATTRS_BUF, p);
else
bufferCopy(ATTRS_BUF, buf, 0, attrsLen, p);
p += attrsLen;
}
this._requests[reqid] = { cb };
const isBuffered = sendOrBuffer(this, buf);
this._debug && this._debug(
`SFTP: Outbound: ${isBuffered ? 'Buffered' : 'Sending'} OPEN`
);
}
const stringFlagMap = {
'r': OPEN_MODE.READ,
'r+': OPEN_MODE.READ | OPEN_MODE.WRITE,
'w': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE,
'wx': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
'xw': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
'w+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE,
'wx+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
| OPEN_MODE.EXCL,
'xw+': OPEN_MODE.TRUNC | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
| OPEN_MODE.EXCL,
'a': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE,
'ax': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
'xa': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.WRITE | OPEN_MODE.EXCL,
'a+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE,
'ax+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
| OPEN_MODE.EXCL,
'xa+': OPEN_MODE.APPEND | OPEN_MODE.CREAT | OPEN_MODE.READ | OPEN_MODE.WRITE
| OPEN_MODE.EXCL
};
Now we understand why it becomes empty: when uploading a file, if the original file exists, it actually truncates first.
So how to avoid this problem?
The solution is to first upload to a temporary directory when using sftp upload, then mv rename to the original file for safety.

