Programmer : Connector Programmer : Advanced Operations and Best Practices : Customizing Connectors to use the Interconnector Service
 
Customizing Connectors to use the Interconnector Service
 
Required dependencies
Master connector sample code
Slave connector sample code
Interconnector aggregation processor
You can customize connectors to allow the use of the Interconnector service between them.
Required dependencies
You must first add the following JAR files located in <DATADIR>/javabin/plugin to your project:
interconnector-service-java-framework.jar
datainteg-java-commons-queue.jar
Master connector sample code
To allow connection between the connectors and the Interconnecter server, you must first check that an Interconnector server has been deployed in the Administration Console (Deployment > Roles). For more details, see "Configure the Interconnector Server" in Exalead CloudView Connectors Guide.
You must also add two configuration keys to your connector:
the Interconnector server instance name
the slave connector name
Below is a sample code for your master connector (JDBC here).
//Master connector
//While processing a column containing a path, adds a File System Query (FS Query = a document path) to the message bus

//Instantiation of the Interconnector Service
InterConnectorServiceBuilderImpl builder = (InterConnectorServiceBuilderImpl) InterConnectorService.builder();
builder.withDestination(config.slaveConnector); //a configuration key has been added to the connector, to know the
//name of the slave connector
builder.withQuerySerializer(new FileSystemQuerySerializer()); //the file system query serializer (to xml) supplied by
//the JDBC connector
builder.withInterconnectorServerInstance(config.interconnectorServerInstanceName); //a configuration key has been
//added to the connector, to know the interconnector server instance name the query will be sent to
InterConnectorServiceImpl service = builder.build(); //this is time consuming, the service should be instantiated
//only once per application (as a Singleton)
//End of the instantiation

//Creation of the File System Query
FileSystemQuery fileSystemQuery = new FileSystemQuery();
fileSystemQuery.setPath(filePath);
//Calls to the service to delete and add a query
service.deleteQuery(docURI.toString()); //clear the query before adding the new one
service.addQuery(fileSystemQuery, false, true, docURI.toString()); //docURI is the URI of the JDBC document
that is currently processed

//Creation of the parent document in the Consolation Box, with type "aggregated"
PushAPITransformationHelpers.addArcTo(document, "parent", docURI.toString() + "_REL");
PushAPITransformationHelpers.setType(document, "aggregated");

//Don't forget to close the service when all the processing is done
service.closeService();

Slave connector sample code
Below is a sample code for your slave connector (File System here).
//Slave connector

//Enumerates the watched queries
InterConnectorServiceBuilderImpl builder = (InterConnectorServiceBuilderImpl) InterConnectorService.builder();
builder.withReceiverName(key.connector.getConnectorName()); //the receiver is the connector itself
builder.withQuerySerializer(new FileSystemQuerySerializer());
builder.withInterconnectorServerInstance(config.interconnectorServerInstanceName);
InterConnectorServiceImpl service = builder.build(); //this is time consuming, the service should be instantiated only
//once per application (as a Singleton)
service.pollMessageQueue();
Iterable<ImmutablePair<String, UserPayloadWithUri<String, String>>> tripletIterable = service.getQueries();
Iterator<ImmutablePair<String, UserPayloadWithUri<String, String>>> iteratorQueries = tripletIterable.iterator();

if (iteratorQueries != null && iteratorQueries.hasNext()){
try {
final ImmutablePair<String, UserPayloadWithUri<String, String>> triplet = iteratorQueries.next();
UserPayloadWithUri<String, String> queryAndFlags = triplet.getRight();
Query query = service.getSerializer().deserialize(queryAndFlags.getValue());
String checkpoint = triplet.getLeft();
String filepath = query.getUID();
...
FilesystemKey skey = new FilesystemKey(key.connector, filepath, connectorconfig.createFileFromRootPath
(filesystemRootPathConfig), false, true);
try {
service.notifyEndOfQueryJob(checkpoint);
}
catch (Exception e){
logger.warn("Error while notifying end of query job to storage");
}
return (FSKey) skey;
}
catch (Exception e){
logger.error("Error while adding a root key ",e);
return null;
}
}


//Processes a watched query, i.e. a file system path in this connector
//Adding a "parent_uri" meta to link the indexed file system document to the indexed JDBC document
try {
ArrayList<String> listParentURIs = service.getParentURIFromUID(file.getAbsolutePath());
if (listParentURIs != null && !listParentURIs.isEmpty()){
for (String parentURI : listParentURIs){
collect.addMeta("parent_uri", parentURI);
}
}
service.closeService();
}
catch (Exception e ){
logger.debug("Error retrieving parent URI while building PAPI document "+ absolutePath, e);
}

//Processes the "parent_uri" metas to create arcs and documents in the consolidation box
Collection<Meta> parents_meta = doc.getMetaContainer().getMetaValues("parent_uri");
if (parents_meta != null && !parents_meta.isEmpty()){
Iterator<Meta> iterator = parents_meta.iterator();
while (iterator.hasNext()){
String uri = iterator.next().getValue();
// creating the "relation" intermediate document in the consolidation box, then link it to the child document
PushAPITransformationHelpers.createUnmanagedDocument(doc, uri + "_REL", "relation");
PushAPITransformationHelpers.addArcFrom(doc, "rel", uri + "_REL");
}
PushAPITransformationHelpers.setType(doc, "child");
}

//Enumerates and processes the deleted queries
Iterable<ImmutablePair<String, UserPayload<String, String>>> deleteIterable = service.getDeletedQueries();
Iterator<ImmutablePair<String, UserPayload<String, String>>> iteratorDeletedQueries = tripletIterable.iterator();


while (iteratorDeletedQueries != null && iteratorDeletedQueries.hasNext()){
final ImmutablePair<String, UserPayloadWithUri<String, String>> triplet = iteratorDeletedQueries.next();
UserPayloadWithUri<String, String> queryAndFlags = triplet.getRight();
Query query = service.getSerializer().deserialize(queryAndFlags.getValue()); ;
String filepath = query.getUID();
papi.deleteDocument(filepath);
}
Interconnector aggregation processor
You must now configure the Interconnector aggregation processor in the Administration Console with the appropriate document types and arcs defined in your code. For more details, see "Add the Interconnector aggregation processor" in Exalead CloudView Connectors Guide.
You can scan your master connector, then your slave connector.